2024-03-01 10:31:02 +00:00
|
|
|
// Licensed to the LF AI & Data foundation under one
|
|
|
|
// or more contributor license agreements. See the NOTICE file
|
|
|
|
// distributed with this work for additional information
|
|
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
|
|
// to you under the Apache License, Version 2.0 (the
|
|
|
|
// "License"); you may not use this file except in compliance
|
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
|
|
package datacoord
|
|
|
|
|
|
|
|
import (
|
2024-11-26 11:46:34 +00:00
|
|
|
"context"
|
2024-10-28 02:13:29 +00:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/hashicorp/golang-lru/v2/expirable"
|
|
|
|
"golang.org/x/exp/maps"
|
|
|
|
|
2024-11-18 02:46:31 +00:00
|
|
|
"github.com/milvus-io/milvus/internal/json"
|
2024-03-01 10:31:02 +00:00
|
|
|
"github.com/milvus-io/milvus/internal/metastore"
|
2024-05-15 08:33:34 +00:00
|
|
|
"github.com/milvus-io/milvus/pkg/util/lock"
|
2024-10-09 06:41:20 +00:00
|
|
|
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
2024-03-01 10:31:02 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type ImportMeta interface {
|
2024-11-26 11:46:34 +00:00
|
|
|
AddJob(ctx context.Context, job ImportJob) error
|
|
|
|
UpdateJob(ctx context.Context, jobID int64, actions ...UpdateJobAction) error
|
|
|
|
GetJob(ctx context.Context, jobID int64) ImportJob
|
|
|
|
GetJobBy(ctx context.Context, filters ...ImportJobFilter) []ImportJob
|
|
|
|
CountJobBy(ctx context.Context, filters ...ImportJobFilter) int
|
|
|
|
RemoveJob(ctx context.Context, jobID int64) error
|
|
|
|
|
|
|
|
AddTask(ctx context.Context, task ImportTask) error
|
|
|
|
UpdateTask(ctx context.Context, taskID int64, actions ...UpdateAction) error
|
|
|
|
GetTask(ctx context.Context, taskID int64) ImportTask
|
|
|
|
GetTaskBy(ctx context.Context, filters ...ImportTaskFilter) []ImportTask
|
|
|
|
RemoveTask(ctx context.Context, taskID int64) error
|
|
|
|
TaskStatsJSON(ctx context.Context) string
|
2024-03-01 10:31:02 +00:00
|
|
|
}
|
|
|
|
|
2024-10-28 02:13:29 +00:00
|
|
|
type importTasks struct {
|
|
|
|
tasks map[int64]ImportTask
|
|
|
|
taskStats *expirable.LRU[int64, ImportTask]
|
|
|
|
}
|
|
|
|
|
|
|
|
func newImportTasks() *importTasks {
|
|
|
|
return &importTasks{
|
|
|
|
tasks: make(map[int64]ImportTask),
|
2024-11-12 09:44:29 +00:00
|
|
|
taskStats: expirable.NewLRU[UniqueID, ImportTask](64, nil, time.Minute*30),
|
2024-10-28 02:13:29 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (t *importTasks) get(taskID int64) ImportTask {
|
|
|
|
ret, ok := t.tasks[taskID]
|
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return ret
|
|
|
|
}
|
|
|
|
|
|
|
|
func (t *importTasks) add(task ImportTask) {
|
|
|
|
t.tasks[task.GetTaskID()] = task
|
|
|
|
t.taskStats.Add(task.GetTaskID(), task)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (t *importTasks) remove(taskID int64) {
|
|
|
|
task, ok := t.tasks[taskID]
|
|
|
|
if ok {
|
|
|
|
delete(t.tasks, taskID)
|
|
|
|
t.taskStats.Add(task.GetTaskID(), task)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (t *importTasks) listTasks() []ImportTask {
|
|
|
|
return maps.Values(t.tasks)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (t *importTasks) listTaskStats() []ImportTask {
|
|
|
|
return t.taskStats.Values()
|
|
|
|
}
|
2024-03-01 10:31:02 +00:00
|
|
|
|
2024-10-28 02:13:29 +00:00
|
|
|
type importMeta struct {
|
|
|
|
mu lock.RWMutex // guards jobs and tasks
|
|
|
|
jobs map[int64]ImportJob
|
|
|
|
tasks *importTasks
|
2024-03-01 10:31:02 +00:00
|
|
|
catalog metastore.DataCoordCatalog
|
|
|
|
}
|
|
|
|
|
2024-11-26 11:46:34 +00:00
|
|
|
func NewImportMeta(ctx context.Context, catalog metastore.DataCoordCatalog) (ImportMeta, error) {
|
|
|
|
restoredPreImportTasks, err := catalog.ListPreImportTasks(ctx)
|
2024-03-01 10:31:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2024-11-26 11:46:34 +00:00
|
|
|
restoredImportTasks, err := catalog.ListImportTasks(ctx)
|
2024-03-01 10:31:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2024-11-26 11:46:34 +00:00
|
|
|
restoredJobs, err := catalog.ListImportJobs(ctx)
|
2024-03-01 10:31:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2024-10-28 02:13:29 +00:00
|
|
|
tasks := newImportTasks()
|
|
|
|
|
2024-03-01 10:31:02 +00:00
|
|
|
for _, task := range restoredPreImportTasks {
|
2024-10-28 02:13:29 +00:00
|
|
|
tasks.add(&preImportTask{
|
2024-03-01 10:31:02 +00:00
|
|
|
PreImportTask: task,
|
2024-10-09 06:41:20 +00:00
|
|
|
tr: timerecord.NewTimeRecorder("preimport task"),
|
2024-10-28 02:13:29 +00:00
|
|
|
})
|
2024-03-01 10:31:02 +00:00
|
|
|
}
|
|
|
|
for _, task := range restoredImportTasks {
|
2024-10-28 02:13:29 +00:00
|
|
|
tasks.add(&importTask{
|
2024-03-01 10:31:02 +00:00
|
|
|
ImportTaskV2: task,
|
2024-10-09 06:41:20 +00:00
|
|
|
tr: timerecord.NewTimeRecorder("import task"),
|
2024-10-28 02:13:29 +00:00
|
|
|
})
|
2024-03-01 10:31:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
jobs := make(map[int64]ImportJob)
|
|
|
|
for _, job := range restoredJobs {
|
|
|
|
jobs[job.GetJobID()] = &importJob{
|
|
|
|
ImportJob: job,
|
2024-10-09 06:41:20 +00:00
|
|
|
tr: timerecord.NewTimeRecorder("import job"),
|
2024-03-01 10:31:02 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return &importMeta{
|
|
|
|
jobs: jobs,
|
|
|
|
tasks: tasks,
|
|
|
|
catalog: catalog,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2024-11-26 11:46:34 +00:00
|
|
|
func (m *importMeta) AddJob(ctx context.Context, job ImportJob) error {
|
2024-03-01 10:31:02 +00:00
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
2024-11-26 11:46:34 +00:00
|
|
|
err := m.catalog.SaveImportJob(ctx, job.(*importJob).ImportJob)
|
2024-03-01 10:31:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
m.jobs[job.GetJobID()] = job
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2024-11-26 11:46:34 +00:00
|
|
|
func (m *importMeta) UpdateJob(ctx context.Context, jobID int64, actions ...UpdateJobAction) error {
|
2024-03-01 10:31:02 +00:00
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
if job, ok := m.jobs[jobID]; ok {
|
|
|
|
updatedJob := job.Clone()
|
|
|
|
for _, action := range actions {
|
|
|
|
action(updatedJob)
|
|
|
|
}
|
2024-11-26 11:46:34 +00:00
|
|
|
err := m.catalog.SaveImportJob(ctx, updatedJob.(*importJob).ImportJob)
|
2024-03-01 10:31:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
m.jobs[updatedJob.GetJobID()] = updatedJob
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2024-11-26 11:46:34 +00:00
|
|
|
func (m *importMeta) GetJob(ctx context.Context, jobID int64) ImportJob {
|
2024-03-01 10:31:02 +00:00
|
|
|
m.mu.RLock()
|
|
|
|
defer m.mu.RUnlock()
|
|
|
|
return m.jobs[jobID]
|
|
|
|
}
|
|
|
|
|
2024-11-26 11:46:34 +00:00
|
|
|
func (m *importMeta) GetJobBy(ctx context.Context, filters ...ImportJobFilter) []ImportJob {
|
2024-03-01 10:31:02 +00:00
|
|
|
m.mu.RLock()
|
|
|
|
defer m.mu.RUnlock()
|
2024-10-23 08:01:28 +00:00
|
|
|
return m.getJobBy(filters...)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *importMeta) getJobBy(filters ...ImportJobFilter) []ImportJob {
|
2024-03-01 10:31:02 +00:00
|
|
|
ret := make([]ImportJob, 0)
|
|
|
|
OUTER:
|
|
|
|
for _, job := range m.jobs {
|
|
|
|
for _, f := range filters {
|
|
|
|
if !f(job) {
|
|
|
|
continue OUTER
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ret = append(ret, job)
|
|
|
|
}
|
|
|
|
return ret
|
|
|
|
}
|
|
|
|
|
2024-11-26 11:46:34 +00:00
|
|
|
func (m *importMeta) CountJobBy(ctx context.Context, filters ...ImportJobFilter) int {
|
2024-10-23 08:01:28 +00:00
|
|
|
m.mu.RLock()
|
|
|
|
defer m.mu.RUnlock()
|
|
|
|
return len(m.getJobBy(filters...))
|
|
|
|
}
|
|
|
|
|
2024-11-26 11:46:34 +00:00
|
|
|
func (m *importMeta) RemoveJob(ctx context.Context, jobID int64) error {
|
2024-03-01 10:31:02 +00:00
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
if _, ok := m.jobs[jobID]; ok {
|
2024-11-26 11:46:34 +00:00
|
|
|
err := m.catalog.DropImportJob(ctx, jobID)
|
2024-03-01 10:31:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
delete(m.jobs, jobID)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2024-11-26 11:46:34 +00:00
|
|
|
func (m *importMeta) AddTask(ctx context.Context, task ImportTask) error {
|
2024-03-01 10:31:02 +00:00
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
|
|
|
switch task.GetType() {
|
|
|
|
case PreImportTaskType:
|
2024-11-26 11:46:34 +00:00
|
|
|
err := m.catalog.SavePreImportTask(ctx, task.(*preImportTask).PreImportTask)
|
2024-03-01 10:31:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2024-10-28 02:13:29 +00:00
|
|
|
m.tasks.add(task)
|
2024-03-01 10:31:02 +00:00
|
|
|
case ImportTaskType:
|
2024-11-26 11:46:34 +00:00
|
|
|
err := m.catalog.SaveImportTask(ctx, task.(*importTask).ImportTaskV2)
|
2024-03-01 10:31:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2024-10-28 02:13:29 +00:00
|
|
|
m.tasks.add(task)
|
2024-03-01 10:31:02 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2024-11-26 11:46:34 +00:00
|
|
|
func (m *importMeta) UpdateTask(ctx context.Context, taskID int64, actions ...UpdateAction) error {
|
2024-03-01 10:31:02 +00:00
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
2024-10-28 02:13:29 +00:00
|
|
|
if task := m.tasks.get(taskID); task != nil {
|
2024-03-01 10:31:02 +00:00
|
|
|
updatedTask := task.Clone()
|
|
|
|
for _, action := range actions {
|
|
|
|
action(updatedTask)
|
|
|
|
}
|
|
|
|
switch updatedTask.GetType() {
|
|
|
|
case PreImportTaskType:
|
2024-11-26 11:46:34 +00:00
|
|
|
err := m.catalog.SavePreImportTask(ctx, updatedTask.(*preImportTask).PreImportTask)
|
2024-03-01 10:31:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2024-10-28 02:13:29 +00:00
|
|
|
m.tasks.add(updatedTask)
|
2024-03-01 10:31:02 +00:00
|
|
|
case ImportTaskType:
|
2024-11-26 11:46:34 +00:00
|
|
|
err := m.catalog.SaveImportTask(ctx, updatedTask.(*importTask).ImportTaskV2)
|
2024-03-01 10:31:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2024-10-28 02:13:29 +00:00
|
|
|
m.tasks.add(updatedTask)
|
2024-03-01 10:31:02 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2024-11-26 11:46:34 +00:00
|
|
|
func (m *importMeta) GetTask(ctx context.Context, taskID int64) ImportTask {
|
2024-03-01 10:31:02 +00:00
|
|
|
m.mu.RLock()
|
|
|
|
defer m.mu.RUnlock()
|
2024-10-28 02:13:29 +00:00
|
|
|
return m.tasks.get(taskID)
|
2024-03-01 10:31:02 +00:00
|
|
|
}
|
|
|
|
|
2024-11-26 11:46:34 +00:00
|
|
|
func (m *importMeta) GetTaskBy(ctx context.Context, filters ...ImportTaskFilter) []ImportTask {
|
2024-03-01 10:31:02 +00:00
|
|
|
m.mu.RLock()
|
|
|
|
defer m.mu.RUnlock()
|
|
|
|
ret := make([]ImportTask, 0)
|
|
|
|
OUTER:
|
2024-10-28 02:13:29 +00:00
|
|
|
for _, task := range m.tasks.listTasks() {
|
2024-03-01 10:31:02 +00:00
|
|
|
for _, f := range filters {
|
|
|
|
if !f(task) {
|
|
|
|
continue OUTER
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ret = append(ret, task)
|
|
|
|
}
|
|
|
|
return ret
|
|
|
|
}
|
|
|
|
|
2024-11-26 11:46:34 +00:00
|
|
|
func (m *importMeta) RemoveTask(ctx context.Context, taskID int64) error {
|
2024-03-01 10:31:02 +00:00
|
|
|
m.mu.Lock()
|
|
|
|
defer m.mu.Unlock()
|
2024-10-28 02:13:29 +00:00
|
|
|
if task := m.tasks.get(taskID); task != nil {
|
2024-03-01 10:31:02 +00:00
|
|
|
switch task.GetType() {
|
|
|
|
case PreImportTaskType:
|
2024-11-26 11:46:34 +00:00
|
|
|
err := m.catalog.DropPreImportTask(ctx, taskID)
|
2024-03-01 10:31:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
case ImportTaskType:
|
2024-11-26 11:46:34 +00:00
|
|
|
err := m.catalog.DropImportTask(ctx, taskID)
|
2024-03-01 10:31:02 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2024-10-28 02:13:29 +00:00
|
|
|
m.tasks.remove(taskID)
|
2024-03-01 10:31:02 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
2024-10-28 02:13:29 +00:00
|
|
|
|
2024-11-26 11:46:34 +00:00
|
|
|
func (m *importMeta) TaskStatsJSON(ctx context.Context) string {
|
2024-10-28 02:13:29 +00:00
|
|
|
tasks := m.tasks.listTaskStats()
|
|
|
|
|
|
|
|
ret, err := json.Marshal(tasks)
|
|
|
|
if err != nil {
|
|
|
|
return ""
|
|
|
|
}
|
|
|
|
return string(ret)
|
|
|
|
}
|