enhance: Pre-allocate ids for import (#33958)

The import is dependent on syncTask, which in turn relies on the
allocator. This PR pre-allocate the necessary IDs for import syncTask.

issue: https://github.com/milvus-io/milvus/issues/33957

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/34464/head
yihao.dai 2024-07-07 21:26:14 +08:00 committed by GitHub
parent f4dd7c7efb
commit 4e5f1d5f75
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 276 additions and 106 deletions

View File

@ -0,0 +1,58 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package allocator
import (
"fmt"
"sync"
)
// localAllocator implements the Interface.
// It is constructed from a range of IDs.
// Once all IDs are allocated, an error will be returned.
type localAllocator struct {
mu sync.Mutex
idStart int64
idEnd int64
}
func NewLocalAllocator(start, end int64) Interface {
return &localAllocator{
idStart: start,
idEnd: end,
}
}
func (a *localAllocator) Alloc(count uint32) (int64, int64, error) {
cnt := int64(count)
if cnt <= 0 {
return 0, 0, fmt.Errorf("non-positive count is not allowed, count=%d", cnt)
}
a.mu.Lock()
defer a.mu.Unlock()
if a.idStart+cnt > a.idEnd {
return 0, 0, fmt.Errorf("ID is exhausted, start=%d, end=%d, count=%d", a.idStart, a.idEnd, cnt)
}
start := a.idStart
a.idStart += cnt
return start, start + cnt, nil
}
func (a *localAllocator) AllocOne() (int64, error) {
start, _, err := a.Alloc(1)
return start, err
}

View File

@ -0,0 +1,72 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package allocator
import (
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func TestLocalAllocator(t *testing.T) {
t.Run("basic", func(t *testing.T) {
alloc := NewLocalAllocator(100, 200)
for i := 0; i < 10; i++ {
start, end, err := alloc.Alloc(10)
assert.NoError(t, err)
assert.Equal(t, int64(100+i*10), start)
assert.Equal(t, int64(100+(i+1)*10), end)
}
_, _, err := alloc.Alloc(10)
assert.Error(t, err)
_, err = alloc.AllocOne()
assert.Error(t, err)
_, _, err = alloc.Alloc(0)
assert.Error(t, err)
})
t.Run("concurrent", func(t *testing.T) {
idMap := typeutil.NewConcurrentMap[int64, struct{}]()
alloc := NewLocalAllocator(111, 1000111)
fn := func(wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 100; i++ {
start, end, err := alloc.Alloc(10)
assert.NoError(t, err)
for j := start; j < end; j++ {
assert.False(t, idMap.Contain(j)) // check no duplicated id
idMap.Insert(j, struct{}{})
}
}
}
wg := &sync.WaitGroup{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go fn(wg)
}
wg.Wait()
assert.Equal(t, 1000000, idMap.Len())
// should be exhausted
assert.Equal(t, alloc.(*localAllocator).idEnd, alloc.(*localAllocator).idStart)
_, err := alloc.AllocOne()
assert.Error(t, err)
t.Logf("%v", err)
})
}

View File

@ -195,13 +195,21 @@ func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc all
if err != nil {
return nil, err
}
totalRows := lo.SumBy(task.GetFileStats(), func(stat *datapb.ImportFileStats) int64 {
return stat.GetTotalRows()
})
idBegin, idEnd, err := alloc.allocN(totalRows)
var (
// Allocated IDs are used for rowID and the BEGINNING of the logID.
allocNum = totalRows + 1
)
idBegin, idEnd, err := alloc.allocN(allocNum)
if err != nil {
return nil, err
}
importFiles := lo.Map(task.GetFileStats(), func(fileStat *datapb.ImportFileStats, _ int) *internalpb.ImportFile {
return fileStat.GetImportFile()
})

View File

@ -263,7 +263,7 @@ func (node *DataNode) Init() error {
}
node.chunkManager = chunkManager
syncMgr, err := syncmgr.NewSyncManager(node.chunkManager, node.allocator)
syncMgr, err := syncmgr.NewSyncManager(node.chunkManager)
if err != nil {
initError = err
log.Error("failed to create sync manager", zap.Error(err))

View File

@ -94,7 +94,7 @@ func NewIDLEDataNodeMock(ctx context.Context, pkType schemapb.DataType) *DataNod
node.broker = broker
node.timeTickSender = util.NewTimeTickSender(broker, 0)
syncMgr, _ := syncmgr.NewSyncManager(node.chunkManager, node.allocator)
syncMgr, _ := syncmgr.NewSyncManager(node.chunkManager)
node.syncMgr = syncMgr
node.writeBufferManager = writebuffer.NewManager(syncMgr)

View File

@ -379,7 +379,7 @@ func (s *SchedulerSuite) TestScheduler_ReadFileStat() {
}
preimportTask := NewPreImportTask(preimportReq, s.manager, s.cm)
s.manager.Add(preimportTask)
err = preimportTask.(*PreImportTask).readFileStat(s.reader, preimportTask, 0)
err = preimportTask.(*PreImportTask).readFileStat(s.reader, 0)
s.NoError(err)
}
@ -431,7 +431,7 @@ func (s *SchedulerSuite) TestScheduler_ImportFile() {
}
importTask := NewImportTask(importReq, s.manager, s.syncMgr, s.cm)
s.manager.Add(importTask)
err = importTask.(*ImportTask).importFile(s.reader, importTask)
err = importTask.(*ImportTask).importFile(s.reader)
s.NoError(err)
}

View File

@ -19,6 +19,7 @@ package importv2
import (
"context"
"io"
"math"
"time"
"github.com/cockroachdb/errors"
@ -26,6 +27,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -45,6 +47,7 @@ type ImportTask struct {
segmentsInfo map[int64]*datapb.ImportSegmentInfo
req *datapb.ImportRequest
allocator allocator.Interface
manager TaskManager
syncMgr syncmgr.SyncManager
cm storage.ChunkManager
@ -62,6 +65,8 @@ func NewImportTask(req *datapb.ImportRequest,
if importutilv2.IsBackup(req.GetOptions()) {
UnsetAutoID(req.GetSchema())
}
// Setting end as math.MaxInt64 to incrementally allocate logID.
alloc := allocator.NewLocalAllocator(req.GetAutoIDRange().GetBegin(), math.MaxInt64)
task := &ImportTask{
ImportTaskV2: &datapb.ImportTaskV2{
JobID: req.GetJobID(),
@ -73,6 +78,7 @@ func NewImportTask(req *datapb.ImportRequest,
cancel: cancel,
segmentsInfo: make(map[int64]*datapb.ImportSegmentInfo),
req: req,
allocator: alloc,
manager: manager,
syncMgr: syncMgr,
cm: cm,
@ -131,7 +137,7 @@ func (t *ImportTask) Execute() []*conc.Future[any] {
req := t.req
fn := func(file *internalpb.ImportFile) error {
reader, err := importutilv2.NewReader(t.ctx, t.cm, t.GetSchema(), file, t.req.GetOptions(), bufferSize)
reader, err := importutilv2.NewReader(t.ctx, t.cm, t.GetSchema(), file, req.GetOptions(), bufferSize)
if err != nil {
log.Warn("new reader failed", WrapLogFields(t, zap.String("file", file.String()), zap.Error(err))...)
t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error()))
@ -139,7 +145,7 @@ func (t *ImportTask) Execute() []*conc.Future[any] {
}
defer reader.Close()
start := time.Now()
err = t.importFile(reader, t)
err = t.importFile(reader)
if err != nil {
log.Warn("do import failed", WrapLogFields(t, zap.String("file", file.String()), zap.Error(err))...)
t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error()))
@ -162,8 +168,7 @@ func (t *ImportTask) Execute() []*conc.Future[any] {
return futures
}
func (t *ImportTask) importFile(reader importutilv2.Reader, task Task) error {
iTask := task.(*ImportTask)
func (t *ImportTask) importFile(reader importutilv2.Reader) error {
syncFutures := make([]*conc.Future[struct{}], 0)
syncTasks := make([]syncmgr.Task, 0)
for {
@ -174,15 +179,15 @@ func (t *ImportTask) importFile(reader importutilv2.Reader, task Task) error {
}
return err
}
err = AppendSystemFieldsData(iTask, data)
err = AppendSystemFieldsData(t, data)
if err != nil {
return err
}
hashedData, err := HashData(iTask, data)
hashedData, err := HashData(t, data)
if err != nil {
return err
}
fs, sts, err := t.sync(iTask, hashedData)
fs, sts, err := t.sync(hashedData)
if err != nil {
return err
}
@ -194,34 +199,34 @@ func (t *ImportTask) importFile(reader importutilv2.Reader, task Task) error {
return err
}
for _, syncTask := range syncTasks {
segmentInfo, err := NewImportSegmentInfo(syncTask, iTask.metaCaches)
segmentInfo, err := NewImportSegmentInfo(syncTask, t.metaCaches)
if err != nil {
return err
}
t.manager.Update(task.GetTaskID(), UpdateSegmentInfo(segmentInfo))
log.Info("sync import data done", WrapLogFields(task, zap.Any("segmentInfo", segmentInfo))...)
t.manager.Update(t.GetTaskID(), UpdateSegmentInfo(segmentInfo))
log.Info("sync import data done", WrapLogFields(t, zap.Any("segmentInfo", segmentInfo))...)
}
return nil
}
func (t *ImportTask) sync(task *ImportTask, hashedData HashedData) ([]*conc.Future[struct{}], []syncmgr.Task, error) {
log.Info("start to sync import data", WrapLogFields(task)...)
func (t *ImportTask) sync(hashedData HashedData) ([]*conc.Future[struct{}], []syncmgr.Task, error) {
log.Info("start to sync import data", WrapLogFields(t)...)
futures := make([]*conc.Future[struct{}], 0)
syncTasks := make([]syncmgr.Task, 0)
for channelIdx, datas := range hashedData {
channel := task.GetVchannels()[channelIdx]
channel := t.GetVchannels()[channelIdx]
for partitionIdx, data := range datas {
if data.GetRowNum() == 0 {
continue
}
partitionID := task.GetPartitionIDs()[partitionIdx]
segmentID := PickSegment(task.req.GetRequestSegments(), channel, partitionID)
syncTask, err := NewSyncTask(task.ctx, task.metaCaches, task.req.GetTs(),
segmentID, partitionID, task.GetCollectionID(), channel, data, nil)
partitionID := t.GetPartitionIDs()[partitionIdx]
segmentID := PickSegment(t.req.GetRequestSegments(), channel, partitionID)
syncTask, err := NewSyncTask(t.ctx, t.allocator, t.metaCaches, t.req.GetTs(),
segmentID, partitionID, t.GetCollectionID(), channel, data, nil)
if err != nil {
return nil, nil, err
}
future := t.syncMgr.SyncData(task.ctx, syncTask)
future := t.syncMgr.SyncData(t.ctx, syncTask)
futures = append(futures, future)
syncTasks = append(syncTasks, syncTask)
}

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"math"
"time"
"github.com/cockroachdb/errors"
@ -27,6 +28,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -46,6 +48,7 @@ type L0ImportTask struct {
segmentsInfo map[int64]*datapb.ImportSegmentInfo
req *datapb.ImportRequest
allocator allocator.Interface
manager TaskManager
syncMgr syncmgr.SyncManager
cm storage.ChunkManager
@ -58,6 +61,8 @@ func NewL0ImportTask(req *datapb.ImportRequest,
cm storage.ChunkManager,
) Task {
ctx, cancel := context.WithCancel(context.Background())
// Setting end as math.MaxInt64 to incrementally allocate logID.
alloc := allocator.NewLocalAllocator(req.GetAutoIDRange().GetBegin(), math.MaxInt64)
task := &L0ImportTask{
ImportTaskV2: &datapb.ImportTaskV2{
JobID: req.GetJobID(),
@ -69,6 +74,7 @@ func NewL0ImportTask(req *datapb.ImportRequest,
cancel: cancel,
segmentsInfo: make(map[int64]*datapb.ImportSegmentInfo),
req: req,
allocator: alloc,
manager: manager,
syncMgr: syncMgr,
cm: cm,
@ -146,7 +152,7 @@ func (t *L0ImportTask) Execute() []*conc.Future[any] {
return
}
start := time.Now()
err = t.importL0(reader, t)
err = t.importL0(reader)
if err != nil {
return
}
@ -163,8 +169,7 @@ func (t *L0ImportTask) Execute() []*conc.Future[any] {
return []*conc.Future[any]{f}
}
func (t *L0ImportTask) importL0(reader binlog.L0Reader, task Task) error {
iTask := task.(*L0ImportTask)
func (t *L0ImportTask) importL0(reader binlog.L0Reader) error {
syncFutures := make([]*conc.Future[struct{}], 0)
syncTasks := make([]syncmgr.Task, 0)
for {
@ -175,11 +180,11 @@ func (t *L0ImportTask) importL0(reader binlog.L0Reader, task Task) error {
}
return err
}
delData, err := HashDeleteData(iTask, data)
delData, err := HashDeleteData(t, data)
if err != nil {
return err
}
fs, sts, err := t.syncDelete(iTask, delData)
fs, sts, err := t.syncDelete(delData)
if err != nil {
return err
}
@ -191,33 +196,33 @@ func (t *L0ImportTask) importL0(reader binlog.L0Reader, task Task) error {
return err
}
for _, syncTask := range syncTasks {
segmentInfo, err := NewImportSegmentInfo(syncTask, iTask.metaCaches)
segmentInfo, err := NewImportSegmentInfo(syncTask, t.metaCaches)
if err != nil {
return err
}
t.manager.Update(task.GetTaskID(), UpdateSegmentInfo(segmentInfo))
log.Info("sync l0 data done", WrapLogFields(task, zap.Any("segmentInfo", segmentInfo))...)
t.manager.Update(t.GetTaskID(), UpdateSegmentInfo(segmentInfo))
log.Info("sync l0 data done", WrapLogFields(t, zap.Any("segmentInfo", segmentInfo))...)
}
return nil
}
func (t *L0ImportTask) syncDelete(task *L0ImportTask, delData []*storage.DeleteData) ([]*conc.Future[struct{}], []syncmgr.Task, error) {
log.Info("start to sync l0 delete data", WrapLogFields(task)...)
func (t *L0ImportTask) syncDelete(delData []*storage.DeleteData) ([]*conc.Future[struct{}], []syncmgr.Task, error) {
log.Info("start to sync l0 delete data", WrapLogFields(t)...)
futures := make([]*conc.Future[struct{}], 0)
syncTasks := make([]syncmgr.Task, 0)
for channelIdx, data := range delData {
channel := task.GetVchannels()[channelIdx]
channel := t.GetVchannels()[channelIdx]
if data.RowCount == 0 {
continue
}
partitionID := task.GetPartitionIDs()[0]
segmentID := PickSegment(task.req.GetRequestSegments(), channel, partitionID)
syncTask, err := NewSyncTask(task.ctx, task.metaCaches, task.req.GetTs(),
segmentID, partitionID, task.GetCollectionID(), channel, nil, data)
partitionID := t.GetPartitionIDs()[0]
segmentID := PickSegment(t.req.GetRequestSegments(), channel, partitionID)
syncTask, err := NewSyncTask(t.ctx, t.allocator, t.metaCaches, t.req.GetTs(),
segmentID, partitionID, t.GetCollectionID(), channel, nil, data)
if err != nil {
return nil, nil, err
}
future := t.syncMgr.SyncData(task.ctx, syncTask)
future := t.syncMgr.SyncData(t.ctx, syncTask)
futures = append(futures, future)
syncTasks = append(syncTasks, syncTask)
}

View File

@ -165,6 +165,10 @@ func (s *L0ImportSuite) TestL0Import() {
Vchannel: s.channel,
},
},
AutoIDRange: &datapb.AutoIDRange{
Begin: 0,
End: int64(s.delCnt),
},
}
task := NewL0ImportTask(req, s.manager, s.syncMgr, s.cm)
s.manager.Add(task)

View File

@ -142,7 +142,7 @@ func (t *L0PreImportTask) Execute() []*conc.Future[any] {
return
}
start := time.Now()
err = t.readL0Stat(reader, t)
err = t.readL0Stat(reader)
if err != nil {
return
}
@ -159,7 +159,7 @@ func (t *L0PreImportTask) Execute() []*conc.Future[any] {
return []*conc.Future[any]{f}
}
func (t *L0PreImportTask) readL0Stat(reader binlog.L0Reader, task Task) error {
func (t *L0PreImportTask) readL0Stat(reader binlog.L0Reader) error {
totalRows := 0
totalSize := 0
hashedStats := make(map[string]*datapb.PartitionImportStats)
@ -171,7 +171,7 @@ func (t *L0PreImportTask) readL0Stat(reader binlog.L0Reader, task Task) error {
}
return err
}
stats, err := GetDeleteStats(task, data)
stats, err := GetDeleteStats(t, data)
if err != nil {
return err
}
@ -180,7 +180,7 @@ func (t *L0PreImportTask) readL0Stat(reader binlog.L0Reader, task Task) error {
size := int(data.Size())
totalRows += rows
totalSize += size
log.Info("reading l0 stat...", WrapLogFields(task, zap.Int("readRows", rows), zap.Int("readSize", size))...)
log.Info("reading l0 stat...", WrapLogFields(t, zap.Int("readRows", rows), zap.Int("readSize", size))...)
}
stat := &datapb.ImportFileStats{
@ -188,6 +188,6 @@ func (t *L0PreImportTask) readL0Stat(reader binlog.L0Reader, task Task) error {
TotalMemorySize: int64(totalSize),
HashedStats: hashedStats,
}
t.manager.Update(task.GetTaskID(), UpdateFileStat(0, stat))
t.manager.Update(t.GetTaskID(), UpdateFileStat(0, stat))
return nil
}

View File

@ -85,66 +85,66 @@ func NewPreImportTask(req *datapb.PreImportRequest,
}
}
func (p *PreImportTask) GetPartitionIDs() []int64 {
return p.partitionIDs
func (t *PreImportTask) GetPartitionIDs() []int64 {
return t.partitionIDs
}
func (p *PreImportTask) GetVchannels() []string {
return p.vchannels
func (t *PreImportTask) GetVchannels() []string {
return t.vchannels
}
func (p *PreImportTask) GetType() TaskType {
func (t *PreImportTask) GetType() TaskType {
return PreImportTaskType
}
func (p *PreImportTask) GetSchema() *schemapb.CollectionSchema {
return p.schema
func (t *PreImportTask) GetSchema() *schemapb.CollectionSchema {
return t.schema
}
func (p *PreImportTask) Cancel() {
p.cancel()
func (t *PreImportTask) Cancel() {
t.cancel()
}
func (p *PreImportTask) Clone() Task {
ctx, cancel := context.WithCancel(p.ctx)
func (t *PreImportTask) Clone() Task {
ctx, cancel := context.WithCancel(t.ctx)
return &PreImportTask{
PreImportTask: typeutil.Clone(p.PreImportTask),
PreImportTask: typeutil.Clone(t.PreImportTask),
ctx: ctx,
cancel: cancel,
partitionIDs: p.GetPartitionIDs(),
vchannels: p.GetVchannels(),
schema: p.GetSchema(),
options: p.options,
partitionIDs: t.GetPartitionIDs(),
vchannels: t.GetVchannels(),
schema: t.GetSchema(),
options: t.options,
}
}
func (p *PreImportTask) Execute() []*conc.Future[any] {
func (t *PreImportTask) Execute() []*conc.Future[any] {
bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024
log.Info("start to preimport", WrapLogFields(p,
log.Info("start to preimport", WrapLogFields(t,
zap.Int("bufferSize", bufferSize),
zap.Any("schema", p.GetSchema()))...)
p.manager.Update(p.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress))
files := lo.Map(p.GetFileStats(),
zap.Any("schema", t.GetSchema()))...)
t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress))
files := lo.Map(t.GetFileStats(),
func(fileStat *datapb.ImportFileStats, _ int) *internalpb.ImportFile {
return fileStat.GetImportFile()
})
fn := func(i int, file *internalpb.ImportFile) error {
reader, err := importutilv2.NewReader(p.ctx, p.cm, p.GetSchema(), file, p.options, bufferSize)
reader, err := importutilv2.NewReader(t.ctx, t.cm, t.GetSchema(), file, t.options, bufferSize)
if err != nil {
log.Warn("new reader failed", WrapLogFields(p, zap.String("file", file.String()), zap.Error(err))...)
p.manager.Update(p.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error()))
log.Warn("new reader failed", WrapLogFields(t, zap.String("file", file.String()), zap.Error(err))...)
t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error()))
return err
}
defer reader.Close()
start := time.Now()
err = p.readFileStat(reader, p, i)
err = t.readFileStat(reader, i)
if err != nil {
log.Warn("preimport failed", WrapLogFields(p, zap.String("file", file.String()), zap.Error(err))...)
p.manager.Update(p.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error()))
log.Warn("preimport failed", WrapLogFields(t, zap.String("file", file.String()), zap.Error(err))...)
t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error()))
return err
}
log.Info("read file stat done", WrapLogFields(p, zap.Strings("files", file.GetPaths()),
log.Info("read file stat done", WrapLogFields(t, zap.Strings("files", file.GetPaths()),
zap.Duration("dur", time.Since(start)))...)
return nil
}
@ -162,7 +162,7 @@ func (p *PreImportTask) Execute() []*conc.Future[any] {
return futures
}
func (p *PreImportTask) readFileStat(reader importutilv2.Reader, task Task, fileIdx int) error {
func (t *PreImportTask) readFileStat(reader importutilv2.Reader, fileIdx int) error {
fileSize, err := reader.Size()
if err != nil {
return err
@ -185,11 +185,11 @@ func (p *PreImportTask) readFileStat(reader importutilv2.Reader, task Task, file
}
return err
}
err = CheckRowsEqual(task.GetSchema(), data)
err = CheckRowsEqual(t.GetSchema(), data)
if err != nil {
return err
}
rowsCount, err := GetRowsStats(task, data)
rowsCount, err := GetRowsStats(t, data)
if err != nil {
return err
}
@ -198,7 +198,7 @@ func (p *PreImportTask) readFileStat(reader importutilv2.Reader, task Task, file
size := data.GetMemorySize()
totalRows += rows
totalSize += size
log.Info("reading file stat...", WrapLogFields(task, zap.Int("readRows", rows), zap.Int("readSize", size))...)
log.Info("reading file stat...", WrapLogFields(t, zap.Int("readRows", rows), zap.Int("readSize", size))...)
}
stat := &datapb.ImportFileStats{
@ -207,6 +207,6 @@ func (p *PreImportTask) readFileStat(reader importutilv2.Reader, task Task, file
TotalMemorySize: int64(totalSize),
HashedStats: hashedStats,
}
p.manager.Update(task.GetTaskID(), UpdateFileStat(fileIdx, stat))
t.manager.Update(t.GetTaskID(), UpdateFileStat(fileIdx, stat))
return nil
}

View File

@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -44,6 +45,7 @@ func WrapTaskNotFoundError(taskID int64) error {
}
func NewSyncTask(ctx context.Context,
allocator allocator.Interface,
metaCaches map[string]metacache.MetaCache,
ts uint64,
segmentID, partitionID, collectionID int64, vchannel string,
@ -71,6 +73,7 @@ func NewSyncTask(ctx context.Context,
var serializer syncmgr.Serializer
var err error
serializer, err = syncmgr.NewStorageSerializer(
allocator,
metaCache,
nil,
)
@ -152,17 +155,19 @@ func CheckRowsEqual(schema *schemapb.CollectionSchema, data *storage.InsertData)
}
func AppendSystemFieldsData(task *ImportTask, data *storage.InsertData) error {
idRange := task.req.GetAutoIDRange()
pkField, err := typeutil.GetPrimaryFieldSchema(task.GetSchema())
if err != nil {
return err
}
rowNum := GetInsertDataRowCount(data, task.GetSchema())
ids := make([]int64, rowNum)
for i := 0; i < rowNum; i++ {
ids[i] = idRange.GetBegin() + int64(i)
start, _, err := task.allocator.Alloc(uint32(rowNum))
if err != nil {
return err
}
for i := 0; i < rowNum; i++ {
ids[i] = start + int64(i)
}
idRange.Begin += int64(rowNum)
if pkField.GetAutoID() {
switch pkField.GetDataType() {
case schemapb.DataType_Int64:

View File

@ -23,6 +23,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/testutil"
"github.com/milvus-io/milvus/pkg/common"
@ -57,13 +58,10 @@ func Test_AppendSystemFieldsData(t *testing.T) {
schema := &schemapb.CollectionSchema{}
task := &ImportTask{
req: &datapb.ImportRequest{
Ts: 1000,
AutoIDRange: &datapb.AutoIDRange{
Begin: 0,
End: count,
},
Ts: 1000,
Schema: schema,
},
allocator: allocator.NewLocalAllocator(0, count*2),
}
pkField.DataType = schemapb.DataType_Int64

View File

@ -25,6 +25,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
@ -44,11 +45,12 @@ type storageV1Serializer struct {
inCodec *storage.InsertCodec
delCodec *storage.DeleteCodec
allocator allocator.Interface
metacache metacache.MetaCache
metaWriter MetaWriter
}
func NewStorageSerializer(metacache metacache.MetaCache, metaWriter MetaWriter) (*storageV1Serializer, error) {
func NewStorageSerializer(allocator allocator.Interface, metacache metacache.MetaCache, metaWriter MetaWriter) (*storageV1Serializer, error) {
collectionID := metacache.Collection()
schema := metacache.Schema()
pkField := lo.FindOrElse(schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool { return field.GetIsPrimaryKey() })
@ -67,6 +69,7 @@ func NewStorageSerializer(metacache metacache.MetaCache, metaWriter MetaWriter)
inCodec: inCodec,
delCodec: storage.NewDeleteCodec(),
allocator: allocator,
metacache: metacache,
metaWriter: metaWriter,
}, nil
@ -135,6 +138,7 @@ func (s *storageV1Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack)
}
s.setTaskMeta(task, pack)
task.WithAllocator(s.allocator)
metrics.DataNodeEncodeBufferLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), pack.level.String()).Observe(float64(tr.RecordSpan().Milliseconds()))
return task, nil

View File

@ -30,10 +30,12 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
@ -47,6 +49,7 @@ type StorageV1SerializerSuite struct {
schema *schemapb.CollectionSchema
mockAllocator *allocator.MockAllocator
mockCache *metacache.MockMetaCache
mockMetaWriter *MockMetaWriter
@ -54,6 +57,8 @@ type StorageV1SerializerSuite struct {
}
func (s *StorageV1SerializerSuite) SetupSuite() {
paramtable.Init()
s.collectionID = rand.Int63n(100) + 1000
s.partitionID = rand.Int63n(100) + 2000
s.segmentID = rand.Int63n(1000) + 10000
@ -80,6 +85,7 @@ func (s *StorageV1SerializerSuite) SetupSuite() {
},
}
s.mockAllocator = allocator.NewMockAllocator(s.T())
s.mockCache = metacache.NewMockMetaCache(s.T())
s.mockMetaWriter = NewMockMetaWriter(s.T())
}
@ -89,7 +95,7 @@ func (s *StorageV1SerializerSuite) SetupTest() {
s.mockCache.EXPECT().Schema().Return(s.schema)
var err error
s.serializer, err = NewStorageSerializer(s.mockCache, s.mockMetaWriter)
s.serializer, err = NewStorageSerializer(s.mockAllocator, s.mockCache, s.mockMetaWriter)
s.Require().NoError(err)
}
@ -314,7 +320,7 @@ func (s *StorageV1SerializerSuite) TestBadSchema() {
mockCache := metacache.NewMockMetaCache(s.T())
mockCache.EXPECT().Collection().Return(s.collectionID).Once()
mockCache.EXPECT().Schema().Return(&schemapb.CollectionSchema{}).Once()
_, err := NewStorageSerializer(mockCache, s.mockMetaWriter)
_, err := NewStorageSerializer(s.mockAllocator, mockCache, s.mockMetaWriter)
s.Error(err)
}

View File

@ -29,6 +29,7 @@ import (
milvus_storage "github.com/milvus-io/milvus-storage/go/storage"
"github.com/milvus-io/milvus-storage/go/storage/options"
"github.com/milvus-io/milvus-storage/go/storage/schema"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
@ -54,10 +55,11 @@ type storageV2Serializer struct {
func NewStorageV2Serializer(
storageV2Cache *metacache.StorageV2Cache,
allocator allocator.Interface,
metacache metacache.MetaCache,
metaWriter MetaWriter,
) (*storageV2Serializer, error) {
v1Serializer, err := NewStorageSerializer(metacache, metaWriter)
v1Serializer, err := NewStorageSerializer(allocator, metacache, metaWriter)
if err != nil {
return nil, err
}

View File

@ -33,6 +33,7 @@ import (
milvus_storage "github.com/milvus-io/milvus-storage/go/storage"
"github.com/milvus-io/milvus-storage/go/storage/options"
"github.com/milvus-io/milvus-storage/go/storage/schema"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
@ -51,6 +52,7 @@ type StorageV2SerializerSuite struct {
schema *schemapb.CollectionSchema
storageCache *metacache.StorageV2Cache
mockAllocator *allocator.MockAllocator
mockCache *metacache.MockMetaCache
mockMetaWriter *MockMetaWriter
@ -86,6 +88,7 @@ func (s *StorageV2SerializerSuite) SetupSuite() {
},
}
s.mockAllocator = allocator.NewMockAllocator(s.T())
s.mockCache = metacache.NewMockMetaCache(s.T())
s.mockMetaWriter = NewMockMetaWriter(s.T())
}
@ -98,7 +101,7 @@ func (s *StorageV2SerializerSuite) SetupTest() {
s.mockCache.EXPECT().Collection().Return(s.collectionID)
s.mockCache.EXPECT().Schema().Return(s.schema)
s.serializer, err = NewStorageV2Serializer(storageCache, s.mockCache, s.mockMetaWriter)
s.serializer, err = NewStorageV2Serializer(storageCache, s.mockAllocator, s.mockCache, s.mockMetaWriter)
s.Require().NoError(err)
}
@ -354,7 +357,7 @@ func (s *StorageV2SerializerSuite) TestBadSchema() {
mockCache := metacache.NewMockMetaCache(s.T())
mockCache.EXPECT().Collection().Return(s.collectionID).Once()
mockCache.EXPECT().Schema().Return(&schemapb.CollectionSchema{}).Once()
_, err := NewStorageV2Serializer(s.storageCache, mockCache, s.mockMetaWriter)
_, err := NewStorageV2Serializer(s.storageCache, s.mockAllocator, mockCache, s.mockMetaWriter)
s.Error(err)
}

View File

@ -51,12 +51,11 @@ type SyncManager interface {
type syncManager struct {
*keyLockDispatcher[int64]
chunkManager storage.ChunkManager
allocator allocator.Interface
tasks *typeutil.ConcurrentMap[string, Task]
}
func NewSyncManager(chunkManager storage.ChunkManager, allocator allocator.Interface) (SyncManager, error) {
func NewSyncManager(chunkManager storage.ChunkManager) (SyncManager, error) {
params := paramtable.Get()
initPoolSize := params.DataNodeCfg.MaxParallelSyncMgrTasks.GetAsInt()
if initPoolSize < 1 {
@ -68,7 +67,6 @@ func NewSyncManager(chunkManager storage.ChunkManager, allocator allocator.Inter
syncMgr := &syncManager{
keyLockDispatcher: dispatcher,
chunkManager: chunkManager,
allocator: allocator,
tasks: typeutil.NewConcurrentMap[string, Task](),
}
// setup config update watcher
@ -100,9 +98,8 @@ func (mgr *syncManager) resizeHandler(evt *config.Event) {
func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
switch t := task.(type) {
case *SyncTask:
t.WithAllocator(mgr.allocator).WithChunkManager(mgr.chunkManager)
t.WithChunkManager(mgr.chunkManager)
case *SyncTaskV2:
t.WithAllocator(mgr.allocator)
}
return mgr.safeSubmitTask(ctx, task, callbacks...)

View File

@ -145,7 +145,8 @@ func (s *SyncManagerSuite) getSuiteSyncTask() *SyncTask {
WithSchema(s.schema).
WithChunkManager(s.chunkManager).
WithAllocator(s.allocator).
WithMetaCache(s.metacache)
WithMetaCache(s.metacache).
WithAllocator(s.allocator)
return task
}
@ -159,7 +160,7 @@ func (s *SyncManagerSuite) TestSubmit() {
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
manager, err := NewSyncManager(s.chunkManager, s.allocator)
manager, err := NewSyncManager(s.chunkManager)
s.NoError(err)
task := s.getSuiteSyncTask()
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1))
@ -189,7 +190,7 @@ func (s *SyncManagerSuite) TestCompacted() {
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
manager, err := NewSyncManager(s.chunkManager, s.allocator)
manager, err := NewSyncManager(s.chunkManager)
s.NoError(err)
task := s.getSuiteSyncTask()
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1))
@ -209,7 +210,7 @@ func (s *SyncManagerSuite) TestCompacted() {
}
func (s *SyncManagerSuite) TestResizePool() {
manager, err := NewSyncManager(s.chunkManager, s.allocator)
manager, err := NewSyncManager(s.chunkManager)
s.NoError(err)
syncMgr, ok := manager.(*syncManager)
@ -245,7 +246,7 @@ func (s *SyncManagerSuite) TestResizePool() {
}
func (s *SyncManagerSuite) TestNewSyncManager() {
manager, err := NewSyncManager(s.chunkManager, s.allocator)
manager, err := NewSyncManager(s.chunkManager)
s.NoError(err)
_, ok := manager.(*syncManager)
@ -257,12 +258,12 @@ func (s *SyncManagerSuite) TestNewSyncManager() {
params.Save(configKey, "0")
_, err = NewSyncManager(s.chunkManager, s.allocator)
_, err = NewSyncManager(s.chunkManager)
s.Error(err)
}
func (s *SyncManagerSuite) TestUnexpectedError() {
manager, err := NewSyncManager(s.chunkManager, s.allocator)
manager, err := NewSyncManager(s.chunkManager)
s.NoError(err)
task := NewMockTask(s.T())
@ -277,7 +278,7 @@ func (s *SyncManagerSuite) TestUnexpectedError() {
}
func (s *SyncManagerSuite) TestTargetUpdateSameID() {
manager, err := NewSyncManager(s.chunkManager, s.allocator)
manager, err := NewSyncManager(s.chunkManager)
s.NoError(err)
task := NewMockTask(s.T())

View File

@ -184,7 +184,7 @@ func (s *SyncTaskSuiteV2) getSuiteSyncTask() *SyncTaskV2 {
s.metacache.EXPECT().Collection().Return(s.collectionID)
s.metacache.EXPECT().Schema().Return(s.schema)
serializer, err := NewStorageV2Serializer(storageCache, s.metacache, nil)
serializer, err := NewStorageV2Serializer(storageCache, s.allocator, s.metacache, nil)
s.Require().NoError(err)
task, err := serializer.EncodeBuffer(context.Background(), pack)
s.Require().NoError(err)

View File

@ -157,11 +157,13 @@ func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2
if params.Params.CommonCfg.EnableStorageV2.GetAsBool() {
serializer, err = syncmgr.NewStorageV2Serializer(
storageV2Cache,
option.idAllocator,
metacache,
option.metaWriter,
)
} else {
serializer, err = syncmgr.NewStorageSerializer(
option.idAllocator,
metacache,
option.metaWriter,
)