Fix listImportTasks bug (#20301)

Signed-off-by: yhmo <yihua.mo@zilliz.com>

Signed-off-by: yhmo <yihua.mo@zilliz.com>
pull/20213/head
groot 2022-11-07 17:11:02 +08:00 committed by GitHub
parent 6e4509d0fc
commit b847c425e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 633 additions and 338 deletions

View File

@ -879,7 +879,7 @@ func (i *IndexCoord) DescribeIndex(ctx context.Context, req *indexpb.DescribeInd
return &indexpb.DescribeIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_IndexNotExist,
Reason: fmt.Sprint("index not exist, collectionID ", req.CollectionID),
Reason: fmt.Sprint("index doesn't exist, collectionID ", req.CollectionID),
},
}, nil
}

View File

@ -349,7 +349,7 @@ func (m *importManager) checkIndexingDone(ctx context.Context, collID UniqueID,
zap.Any("index info", descIdxResp.GetIndexInfos()))
if descIdxResp.GetStatus().GetErrorCode() == commonpb.ErrorCode_IndexNotExist ||
len(descIdxResp.GetIndexInfos()) == 0 {
log.Info("index not exist for collection",
log.Info("index doesn't exist for collection",
zap.Int64("collection ID", collID))
return true, nil
}
@ -694,6 +694,7 @@ func (m *importManager) copyTaskInfo(input *datapb.ImportTaskInfo, output *milvu
output.Status = &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
output.Id = input.GetId()
output.CollectionId = input.GetCollectionId()
output.State = input.GetState().GetStateCode()
@ -777,6 +778,7 @@ func (m *importManager) loadFromTaskStore(load2Mem bool) ([]*datapb.ImportTaskIn
return nil, err
}
var taskList []*datapb.ImportTaskInfo
for i := range v {
ti := &datapb.ImportTaskInfo{}
if err := proto.Unmarshal([]byte(v[i]), ti); err != nil {
@ -784,6 +786,7 @@ func (m *importManager) loadFromTaskStore(load2Mem bool) ([]*datapb.ImportTaskIn
// Ignore bad protos.
continue
}
if load2Mem {
// Put pending tasks back to pending task list.
if ti.GetState().GetStateCode() == commonpb.ImportState_ImportPending {
@ -974,38 +977,35 @@ func rearrangeTasks(tasks []*milvuspb.GetImportStateResponse) {
})
}
func (m *importManager) listAllTasks(colName string, limit int64) []*milvuspb.GetImportStateResponse {
tasks := make([]*milvuspb.GetImportStateResponse, 0)
func (m *importManager) listAllTasks(colID int64, limit int64) ([]*milvuspb.GetImportStateResponse, error) {
var importTasks []*datapb.ImportTaskInfo
var err error
if importTasks, err = m.loadFromTaskStore(false); err != nil {
log.Error("failed to load from task store", zap.Error(err))
return tasks
return nil, fmt.Errorf("failed to load task list from etcd, error: %w", err)
}
// filter tasks by collection name
// TODO: how to handle duplicated collection name? for example: a new collection has same name with a dropped collection
tasks := make([]*milvuspb.GetImportStateResponse, 0)
// filter tasks by collection id
// if colID is negative, we will return all tasks
for _, task := range importTasks {
if colName != "" && task.GetCollectionName() != colName {
continue
if colID < 0 || colID == task.GetCollectionId() {
currTask := &milvuspb.GetImportStateResponse{}
m.copyTaskInfo(task, currTask)
tasks = append(tasks, currTask)
}
currTask := &milvuspb.GetImportStateResponse{}
m.copyTaskInfo(task, currTask)
tasks = append(tasks, currTask)
}
// arrange tasks by id with ascending order, actually, id is the create time of a task
// arrange tasks by id in ascending order, actually, id is the create time of a task
rearrangeTasks(tasks)
// if limit is 0 or larger than length of tasks, return all tasks
if limit <= 0 || limit >= int64(len(tasks)) {
return tasks
return tasks, nil
}
// return the newly tasks from the tail
return tasks[len(tasks)-int(limit):]
return tasks[len(tasks)-int(limit):], nil
}
// removeBadImportSegments marks segments of a failed import task as `dropped`.

View File

@ -19,6 +19,7 @@ package rootcoord
import (
"context"
"errors"
"strings"
"sync"
"testing"
"time"
@ -525,7 +526,7 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) {
})
wg.Add(1)
t.Run("describe index with index not exist", func(t *testing.T) {
t.Run("describe index with index doesn't exist", func(t *testing.T) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
@ -920,8 +921,6 @@ func TestImportManager_ListAllTasks(t *testing.T) {
}
Params.RootCoordCfg.ImportTaskSubPath = "test_import_task"
colID := int64(100)
mockKv := memkv.NewMemoryKV()
// reject some tasks so there are 3 tasks left in pending list
fn := func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) {
@ -932,41 +931,113 @@ func TestImportManager_ListAllTasks(t *testing.T) {
}, nil
}
rowReq := &milvuspb.ImportRequest{
CollectionName: "c1",
PartitionName: "p1",
Files: []string{"f1.json"},
}
callMarkSegmentsDropped := func(ctx context.Context, segIDs []typeutil.UniqueID) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
mgr := newImportManager(context.TODO(), mockKv, idAlloc, fn, callMarkSegmentsDropped, nil, nil, nil, nil)
repeat := 10
for i := 0; i < repeat; i++ {
mgr.importJob(context.TODO(), rowReq, colID, 0)
colID1 := int64(100)
colID2 := int64(101)
colName1 := "c1"
colName2 := "c2"
partID1 := int64(200)
partID2 := int64(201)
partName1 := "p1"
partName2 := "p2"
getCollectionName := func(collID, partitionID typeutil.UniqueID) (string, string, error) {
collectionName := "unknow"
if collID == colID1 {
collectionName = colName1
} else if collID == colID2 {
collectionName = colName2
}
partitionName := "unknow"
if partitionID == partID1 {
partitionName = partName1
} else if partitionID == partID2 {
partitionName = partName2
}
return collectionName, partitionName, nil
}
// list all tasks
tasks := mgr.listAllTasks("", int64(repeat))
assert.Equal(t, repeat, len(tasks))
for i := 0; i < repeat; i++ {
assert.Equal(t, int64(i+1), tasks[i].Id)
mockKv := memkv.NewMemoryKV()
mgr := newImportManager(context.TODO(), mockKv, idAlloc, fn, callMarkSegmentsDropped, getCollectionName, nil, nil, nil)
// add 10 tasks for collection1, id from 1 to 10
file1 := "f1.json"
rowReq1 := &milvuspb.ImportRequest{
CollectionName: colName1,
PartitionName: partName1,
Files: []string{file1},
}
repeat1 := 10
for i := 0; i < repeat1; i++ {
mgr.importJob(context.TODO(), rowReq1, colID1, partID1)
}
// list few tasks
// add 5 tasks for collection2, id from 11 to 15, totally 15 tasks
file2 := "f2.json"
rowReq2 := &milvuspb.ImportRequest{
CollectionName: colName2,
PartitionName: partName2,
Files: []string{file2},
}
repeat2 := 5
for i := 0; i < repeat2; i++ {
mgr.importJob(context.TODO(), rowReq2, colID2, partID2)
}
verifyTaskFunc := func(task *milvuspb.GetImportStateResponse, taskID int64, colID int64, state commonpb.ImportState) {
assert.Equal(t, commonpb.ErrorCode_Success, task.GetStatus().ErrorCode)
assert.Equal(t, taskID, task.GetId())
assert.Equal(t, colID, task.GetCollectionId())
assert.Equal(t, state, task.GetState())
compareReq := rowReq1
if colID == colID2 {
compareReq = rowReq2
}
for _, kv := range task.GetInfos() {
if kv.GetKey() == CollectionName {
assert.Equal(t, compareReq.GetCollectionName(), kv.GetValue())
} else if kv.GetKey() == PartitionName {
assert.Equal(t, compareReq.GetPartitionName(), kv.GetValue())
} else if kv.GetKey() == Files {
assert.Equal(t, strings.Join(compareReq.GetFiles(), ","), kv.GetValue())
}
}
}
// list all tasks of collection1, id from 1 to 10
tasks, err := mgr.listAllTasks(colID1, int64(repeat1))
assert.NoError(t, err)
assert.Equal(t, repeat1, len(tasks))
for i := 0; i < repeat1; i++ {
verifyTaskFunc(tasks[i], int64(i+1), colID1, commonpb.ImportState_ImportPending)
}
// list latest 3 tasks of collection1, id from 8 to 10
limit := 3
tasks = mgr.listAllTasks("", int64(limit))
tasks, err = mgr.listAllTasks(colID1, int64(limit))
assert.NoError(t, err)
assert.Equal(t, limit, len(tasks))
for i := 0; i < limit; i++ {
assert.Equal(t, int64(i+repeat-limit+1), tasks[i].Id)
verifyTaskFunc(tasks[i], int64(i+repeat1-limit+1), colID1, commonpb.ImportState_ImportPending)
}
// list all tasks of collection2, id from 11 to 15
tasks, err = mgr.listAllTasks(colID2, int64(repeat2))
assert.NoError(t, err)
assert.Equal(t, repeat2, len(tasks))
for i := 0; i < repeat2; i++ {
verifyTaskFunc(tasks[i], int64(i+repeat1+1), colID2, commonpb.ImportState_ImportPending)
}
// get the first task state
resp := mgr.getTaskState(1)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, commonpb.ImportState_ImportPending, resp.State)
assert.Equal(t, int64(1), resp.Id)
verifyTaskFunc(resp, int64(1), colID1, commonpb.ImportState_ImportPending)
// accept tasks to working list
mgr.callImportService = func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) {
@ -977,10 +1048,14 @@ func TestImportManager_ListAllTasks(t *testing.T) {
}, nil
}
// there are 10 tasks in working list, and 1 task in pending list, totally 11 tasks
mgr.importJob(context.TODO(), rowReq, colID, 0)
tasks = mgr.listAllTasks("", 100)
assert.Equal(t, repeat+1, len(tasks))
// there are 15 tasks in working list, and 1 task for collection1 in pending list, totally 16 tasks
mgr.importJob(context.TODO(), rowReq1, colID1, partID1)
tasks, err = mgr.listAllTasks(-1, 0)
assert.NoError(t, err)
assert.Equal(t, repeat1+repeat2+1, len(tasks))
for i := 0; i < len(tasks); i++ {
assert.Equal(t, commonpb.ImportState_ImportStarted, tasks[i].GetState())
}
// the id of tasks must be 1,2,3,4,5,6(sequence not guaranteed)
ids := make(map[int64]struct{})
@ -992,13 +1067,19 @@ func TestImportManager_ListAllTasks(t *testing.T) {
}
assert.Equal(t, 0, len(ids))
// list few tasks
tasks = mgr.listAllTasks("", 1)
// list the latest task, the task is for collection1
tasks, err = mgr.listAllTasks(-1, 1)
assert.NoError(t, err)
assert.Equal(t, 1, len(tasks))
verifyTaskFunc(tasks[0], int64(repeat1+repeat2+1), colID1, commonpb.ImportState_ImportStarted)
// invliad collection name, returns empty
tasks = mgr.listAllTasks("bad-collection-name", 1)
assert.Equal(t, 0, len(tasks))
// failed to load task from store
mockTxnKV := &mocks.TxnKV{}
mockTxnKV.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, errors.New("mock error"))
mgr.taskStore = mockTxnKV
tasks, err = mgr.listAllTasks(-1, 0)
assert.Error(t, err)
assert.Nil(t, tasks)
}
func TestImportManager_setCollectionPartitionName(t *testing.T) {

View File

@ -1652,11 +1652,37 @@ func (c *Core) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTask
}, nil
}
colID := int64(-1)
collectionName := req.GetCollectionName()
if len(collectionName) != 0 {
// if the collection name is specified but not found, user may input a wrong name, the collection doesn't exist or has been dropped.
// we will return error to notify user the name is incorrect.
colInfo, err := c.meta.GetCollectionByName(ctx, req.GetCollectionName(), typeutil.MaxTimestamp)
if err != nil {
err = fmt.Errorf("failed to find collection ID from its name: '%s', error: %w", req.GetCollectionName(), err)
log.Error("ListImportTasks failed", zap.Error(err))
return &milvuspb.ListImportTasksResponse{
Status: failStatus(commonpb.ErrorCode_IllegalCollectionName, err.Error()),
}, nil
}
colID = colInfo.CollectionID
}
// if the collection name is not specified, the colID is -1, listAllTasks will return all tasks
tasks, err := c.importManager.listAllTasks(colID, req.GetLimit())
if err != nil {
err = fmt.Errorf("failed to list import tasks, collection name: '%s', error: %w", req.GetCollectionName(), err)
log.Error("ListImportTasks failed", zap.Error(err))
return &milvuspb.ListImportTasksResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()),
}, nil
}
resp := &milvuspb.ListImportTasksResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Tasks: c.importManager.listAllTasks(req.GetCollectionName(), req.GetLimit()),
Tasks: tasks,
}
return resp, nil
}

View File

@ -14,6 +14,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/allocator"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
@ -29,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestRootCoord_CreateCollection(t *testing.T) {
@ -918,22 +920,25 @@ func TestCore_ListImportTasks(t *testing.T) {
ti1 := &datapb.ImportTaskInfo{
Id: 100,
CollectionName: "collection-A",
CollectionId: 1,
State: &datapb.ImportTaskState{
StateCode: commonpb.ImportState_ImportPending,
},
CreateTs: time.Now().Unix() - 100,
CreateTs: time.Now().Unix() - 300,
}
ti2 := &datapb.ImportTaskInfo{
Id: 200,
CollectionName: "collection-A",
CollectionId: 1,
State: &datapb.ImportTaskState{
StateCode: commonpb.ImportState_ImportPersisted,
},
CreateTs: time.Now().Unix() - 100,
CreateTs: time.Now().Unix() - 200,
}
ti3 := &datapb.ImportTaskInfo{
Id: 300,
CollectionName: "collection-B",
CollectionId: 2,
State: &datapb.ImportTaskState{
StateCode: commonpb.ImportState_ImportPersisted,
},
@ -945,7 +950,7 @@ func TestCore_ListImportTasks(t *testing.T) {
assert.NoError(t, err)
taskInfo3, err := proto.Marshal(ti3)
assert.NoError(t, err)
mockKv.Save(BuildImportTaskKey(1), "value")
mockKv.Save(BuildImportTaskKey(1), "value") // this item will trigger an error log in importManager.loadFromTaskStore()
mockKv.Save(BuildImportTaskKey(100), string(taskInfo1))
mockKv.Save(BuildImportTaskKey(200), string(taskInfo2))
mockKv.Save(BuildImportTaskKey(300), string(taskInfo3))
@ -958,14 +963,83 @@ func TestCore_ListImportTasks(t *testing.T) {
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
verifyTaskFunc := func(task *milvuspb.GetImportStateResponse, taskID int64, colID int64, state commonpb.ImportState) {
assert.Equal(t, commonpb.ErrorCode_Success, task.GetStatus().ErrorCode)
assert.Equal(t, taskID, task.GetId())
assert.Equal(t, state, task.GetState())
assert.Equal(t, colID, task.GetCollectionId())
}
t.Run("normal case", func(t *testing.T) {
meta := newMockMetaTable()
meta.GetCollectionByNameFunc = func(ctx context.Context, collectionName string, ts Timestamp) (*model.Collection, error) {
if collectionName == ti1.CollectionName {
return &model.Collection{
CollectionID: ti1.CollectionId,
}, nil
} else if collectionName == ti3.CollectionName {
return &model.Collection{
CollectionID: ti3.CollectionId,
}, nil
}
return nil, errors.New("GetCollectionByName error")
}
ctx := context.Background()
c := newTestCore(withHealthyCode())
c := newTestCore(withHealthyCode(), withMeta(meta))
c.importManager = newImportManager(ctx, mockKv, nil, nil, nil, nil, nil, nil, nil)
// list all tasks
resp, err := c.ListImportTasks(ctx, &milvuspb.ListImportTasksRequest{})
assert.NoError(t, err)
assert.Equal(t, 3, len(resp.GetTasks()))
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
verifyTaskFunc(resp.GetTasks()[0], 100, 1, commonpb.ImportState_ImportPending)
verifyTaskFunc(resp.GetTasks()[1], 200, 1, commonpb.ImportState_ImportPersisted)
verifyTaskFunc(resp.GetTasks()[2], 300, 2, commonpb.ImportState_ImportPersisted)
// list tasks of collection-A
resp, err = c.ListImportTasks(ctx, &milvuspb.ListImportTasksRequest{
CollectionName: "collection-A",
})
assert.NoError(t, err)
assert.Equal(t, 2, len(resp.GetTasks()))
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
// list tasks of collection-B
resp, err = c.ListImportTasks(ctx, &milvuspb.ListImportTasksRequest{
CollectionName: "collection-B",
})
assert.NoError(t, err)
assert.Equal(t, 1, len(resp.GetTasks()))
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
// invalid collection name
resp, err = c.ListImportTasks(ctx, &milvuspb.ListImportTasksRequest{
CollectionName: "dummy",
})
assert.NoError(t, err)
assert.Equal(t, 0, len(resp.GetTasks()))
assert.Equal(t, commonpb.ErrorCode_IllegalCollectionName, resp.GetStatus().GetErrorCode())
// list the latest 2 tasks
resp, err = c.ListImportTasks(ctx, &milvuspb.ListImportTasksRequest{
Limit: 2,
})
assert.NoError(t, err)
assert.Equal(t, 2, len(resp.GetTasks()))
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
verifyTaskFunc(resp.GetTasks()[0], 200, 1, commonpb.ImportState_ImportPersisted)
verifyTaskFunc(resp.GetTasks()[1], 300, 2, commonpb.ImportState_ImportPersisted)
// failed to load tasks from store
mockTxnKV := &mocks.TxnKV{}
mockTxnKV.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, errors.New("mock error"))
c.importManager.taskStore = mockTxnKV
resp, err = c.ListImportTasks(ctx, &milvuspb.ListImportTasksRequest{})
assert.NoError(t, err)
assert.Equal(t, 0, len(resp.GetTasks()))
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
})
}

View File

@ -217,8 +217,8 @@ func (p *BinlogAdapter) Read(segmentHolder *SegmentFilesHolder) error {
return err
}
} else {
log.Error("Binlog adapter: unknow primary key type", zap.Int("type", int(p.primaryType)))
return errors.New("unknow primary key type")
log.Error("Binlog adapter: unsupported primary key type", zap.Int("type", int(p.primaryType)))
return fmt.Errorf("unsupported primary key type %d, primary key should be int64 or varchar", p.primaryType)
}
// if shardList is empty, that means all the primary keys have been deleted(or skipped), no need to read other files
@ -270,7 +270,7 @@ func (p *BinlogAdapter) verify(segmentHolder *SegmentFilesHolder) error {
files, ok := segmentHolder.fieldFiles[schema.FieldID]
if !ok {
log.Error("Binlog adapter: a field has no binlog file", zap.Int64("fieldID", schema.FieldID))
return errors.New("the field " + strconv.Itoa(int(schema.FieldID)) + " has no binlog file")
return fmt.Errorf("the field %d has no binlog file", schema.FieldID)
}
if i == 0 {
@ -296,7 +296,8 @@ func (p *BinlogAdapter) verify(segmentHolder *SegmentFilesHolder) error {
for _, files := range segmentHolder.fieldFiles {
if firstFieldFileCount != len(files) {
log.Error("Binlog adapter: file count of each field must be equal", zap.Int("firstFieldFileCount", firstFieldFileCount))
return errors.New("binlog file count of each field must be equal")
return fmt.Errorf("binlog file count of each field must be equal, first field files count: %d, other field files count: %d",
firstFieldFileCount, len(files))
}
}
@ -332,8 +333,8 @@ func (p *BinlogAdapter) readDeltalogs(segmentHolder *SegmentFilesHolder) (map[in
log.Info("Binlog adapter: count of deleted entities", zap.Int("deletedCount", len(deletedIDDict)))
return nil, deletedIDDict, nil
} else {
log.Error("Binlog adapter: primary key is neither int64 nor varchar")
return nil, nil, errors.New("primary key is neither int64 nor varchar")
log.Error("Binlog adapter: unsupported primary key type", zap.Int("type", int(p.primaryType)))
return nil, nil, fmt.Errorf("unsupported primary key type %d, primary key should be int64 or varchar", p.primaryType)
}
}
@ -378,7 +379,8 @@ func (p *BinlogAdapter) decodeDeleteLogs(segmentHolder *SegmentFilesHolder) ([]*
log.Error("Binlog adapter: delta log data type is not equal to collection's primary key data type",
zap.Int64("deltaDataType", deleteLogs[i].PkType),
zap.Int64("pkDataType", int64(p.primaryType)))
return nil, errors.New("delta log data type is not equal to collection's primary key data type")
return nil, fmt.Errorf("delta log data type %d is not equal to collection's primary key data type %d",
deleteLogs[i].PkType, p.primaryType)
}
}
@ -396,13 +398,13 @@ func (p *BinlogAdapter) decodeDeleteLog(deltaStr string) (*storage.DeleteLog, er
splits := strings.Split(deltaStr, ",")
if len(splits) != 2 {
log.Error("Binlog adapter: the format of deletion string is incorrect", zap.String("deltaStr", deltaStr))
return nil, fmt.Errorf("the format of deletion string is incorrect, %v can not be split", deltaStr)
return nil, fmt.Errorf("the format of deletion string is incorrect, '%s' can not be split", deltaStr)
}
pk, err := strconv.ParseInt(splits[0], 10, 64)
if err != nil {
log.Error("Binlog adapter: failed to parse primary key of deletion string from old version",
zap.String("deltaStr", deltaStr), zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to parse primary key of deletion string '%s' from old version, error: %w", deltaStr, err)
}
deleteLog.Pk = &storage.Int64PrimaryKey{
Value: pk,
@ -412,7 +414,7 @@ func (p *BinlogAdapter) decodeDeleteLog(deltaStr string) (*storage.DeleteLog, er
if err != nil {
log.Error("Binlog adapter: failed to parse timestamp of deletion string from old version",
zap.String("deltaStr", deltaStr), zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to parse timestamp of deletion string '%s' from old version, error: %w", deltaStr, err)
}
}
@ -425,13 +427,13 @@ func (p *BinlogAdapter) readDeltalog(logPath string) ([]string, error) {
binlogFile, err := NewBinlogFile(p.chunkManager)
if err != nil {
log.Error("Binlog adapter: failed to initialize binlog file", zap.String("logPath", logPath), zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to initialize binlog file '%s', error: %w", logPath, err)
}
err = binlogFile.Open(logPath)
if err != nil {
log.Error("Binlog adapter: failed to open delta log", zap.String("logPath", logPath), zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to open delta log '%s', error: %w", logPath, err)
}
defer binlogFile.Close()
@ -439,7 +441,7 @@ func (p *BinlogAdapter) readDeltalog(logPath string) ([]string, error) {
data, err := binlogFile.ReadVarchar()
if err != nil {
log.Error("Binlog adapter: failed to read delta log", zap.String("logPath", logPath), zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to read delta log '%s', error: %w", logPath, err)
}
log.Info("Binlog adapter: successfully read deltalog", zap.Int("deleteCount", len(data)))
@ -452,13 +454,13 @@ func (p *BinlogAdapter) readTimestamp(logPath string) ([]int64, error) {
binlogFile, err := NewBinlogFile(p.chunkManager)
if err != nil {
log.Error("Binlog adapter: failed to initialize binlog file", zap.String("logPath", logPath), zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to initialize binlog file '%s', error: %w", logPath, err)
}
err = binlogFile.Open(logPath)
if err != nil {
log.Error("Binlog adapter: failed to open timestamp log file", zap.String("logPath", logPath))
return nil, err
return nil, fmt.Errorf("failed to open timestamp log file '%s', error: %w", logPath, err)
}
defer binlogFile.Close()
@ -466,7 +468,7 @@ func (p *BinlogAdapter) readTimestamp(logPath string) ([]int64, error) {
int64List, err := binlogFile.ReadInt64()
if err != nil {
log.Error("Binlog adapter: failed to read timestamp data from log file", zap.String("logPath", logPath))
return nil, err
return nil, fmt.Errorf("failed to read timestamp data from log file '%s', error: %w", logPath, err)
}
log.Info("Binlog adapter: read timestamp from log file", zap.Int("tsCount", len(int64List)))
@ -480,13 +482,13 @@ func (p *BinlogAdapter) readPrimaryKeys(logPath string) ([]int64, []string, erro
binlogFile, err := NewBinlogFile(p.chunkManager)
if err != nil {
log.Error("Binlog adapter: failed to initialize binlog file", zap.String("logPath", logPath), zap.Error(err))
return nil, nil, err
return nil, nil, fmt.Errorf("failed to initialize binlog file '%s', error: %w", logPath, err)
}
err = binlogFile.Open(logPath)
if err != nil {
log.Error("Binlog adapter: failed to open primary key binlog", zap.String("logPath", logPath))
return nil, nil, err
return nil, nil, fmt.Errorf("failed to open primary key binlog '%s', error: %w", logPath, err)
}
defer binlogFile.Close()
@ -495,7 +497,7 @@ func (p *BinlogAdapter) readPrimaryKeys(logPath string) ([]int64, []string, erro
idList, err := binlogFile.ReadInt64()
if err != nil {
log.Error("Binlog adapter: failed to read int64 primary key from binlog", zap.String("logPath", logPath), zap.Error(err))
return nil, nil, err
return nil, nil, fmt.Errorf("failed to read int64 primary key from binlog '%s', error: %w", logPath, err)
}
log.Info("Binlog adapter: succeed to read int64 primary key binlog", zap.Int("len", len(idList)))
return idList, nil, nil
@ -503,13 +505,13 @@ func (p *BinlogAdapter) readPrimaryKeys(logPath string) ([]int64, []string, erro
idList, err := binlogFile.ReadVarchar()
if err != nil {
log.Error("Binlog adapter: failed to read varchar primary key from binlog", zap.String("logPath", logPath), zap.Error(err))
return nil, nil, err
return nil, nil, fmt.Errorf("failed to read varchar primary key from binlog '%s', error: %w", logPath, err)
}
log.Info("Binlog adapter: succeed to read varchar primary key binlog", zap.Int("len", len(idList)))
return nil, idList, nil
} else {
log.Error("Binlog adapter: primary key is neither int64 nor varchar")
return nil, nil, errors.New("primary key is neither int64 nor varchar")
log.Error("Binlog adapter: unsupported primary key type", zap.Int("type", int(p.primaryType)))
return nil, nil, fmt.Errorf("unsupported primary key type %d, primary key should be int64 or varchar", p.primaryType)
}
}
@ -524,7 +526,7 @@ func (p *BinlogAdapter) getShardingListByPrimaryInt64(primaryKeys []int64,
if len(timestampList) != len(primaryKeys) {
log.Error("Binlog adapter: primary key length is not equal to timestamp list length",
zap.Int("primaryKeysLen", len(primaryKeys)), zap.Int("timestampLen", len(timestampList)))
return nil, errors.New("primary key length is not equal to timestamp list length")
return nil, fmt.Errorf("primary key length %d is not equal to timestamp list length %d", len(primaryKeys), len(timestampList))
}
log.Info("Binlog adapter: building shard list", zap.Int("pkLen", len(primaryKeys)), zap.Int("tsLen", len(timestampList)))
@ -576,7 +578,7 @@ func (p *BinlogAdapter) getShardingListByPrimaryVarchar(primaryKeys []string,
if len(timestampList) != len(primaryKeys) {
log.Error("Binlog adapter: primary key length is not equal to timestamp list length",
zap.Int("primaryKeysLen", len(primaryKeys)), zap.Int("timestampLen", len(timestampList)))
return nil, errors.New("primary key length is not equal to timestamp list length")
return nil, fmt.Errorf("primary key length %d is not equal to timestamp list length %d", len(primaryKeys), len(timestampList))
}
log.Info("Binlog adapter: building shard list", zap.Int("pkLen", len(primaryKeys)), zap.Int("tsLen", len(timestampList)))
@ -633,13 +635,13 @@ func (p *BinlogAdapter) readInsertlog(fieldID storage.FieldID, logPath string,
binlogFile, err := NewBinlogFile(p.chunkManager)
if err != nil {
log.Error("Binlog adapter: failed to initialize binlog file", zap.String("logPath", logPath), zap.Error(err))
return err
return fmt.Errorf("failed to initialize binlog file %s, error: %w", logPath, err)
}
err = binlogFile.Open(logPath)
if err != nil {
log.Error("Binlog adapter: failed to open insert log", zap.String("logPath", logPath), zap.Error(err))
return err
return fmt.Errorf("failed to open insert log %s, error: %w", logPath, err)
}
defer binlogFile.Close()
@ -746,7 +748,7 @@ func (p *BinlogAdapter) readInsertlog(fieldID storage.FieldID, logPath string,
return err
}
default:
return errors.New("unsupported data type")
return fmt.Errorf("unsupported data type %d", binlogFile.DataType())
}
log.Info("Binlog adapter: read data into shard list", zap.Int("dataType", int(binlogFile.DataType())), zap.Int("shardLen", len(shardList)))
@ -757,8 +759,8 @@ func (p *BinlogAdapter) dispatchBoolToShards(data []bool, memoryData []map[stora
shardList []int32, fieldID storage.FieldID) error {
// verify row count
if len(data) != len(shardList) {
log.Error("Binlog adapter: bool field row count is not equal to primary key", zap.Int("dataLen", len(data)), zap.Int("shardLen", len(shardList)))
return errors.New("bool field row count is not equal to primary key")
log.Error("Binlog adapter: bool field row count is not equal to shard list row count %d", zap.Int("dataLen", len(data)), zap.Int("shardLen", len(shardList)))
return fmt.Errorf("bool field row count %d is not equal to shard list row count %d", len(data), len(shardList))
}
// dispatch entities acoording to shard list
@ -781,8 +783,8 @@ func (p *BinlogAdapter) dispatchInt8ToShards(data []int8, memoryData []map[stora
shardList []int32, fieldID storage.FieldID) error {
// verify row count
if len(data) != len(shardList) {
log.Error("Binlog adapter: int8 field row count is not equal to primary key", zap.Int("dataLen", len(data)), zap.Int("shardLen", len(shardList)))
return errors.New("int8 field row count is not equal to primary key")
log.Error("Binlog adapter: int8 field row count is not equal to shard list row count", zap.Int("dataLen", len(data)), zap.Int("shardLen", len(shardList)))
return fmt.Errorf("int8 field row count %d is not equal to shard list row count %d", len(data), len(shardList))
}
// dispatch entity acoording to shard list
@ -805,8 +807,8 @@ func (p *BinlogAdapter) dispatchInt16ToShards(data []int16, memoryData []map[sto
shardList []int32, fieldID storage.FieldID) error {
// verify row count
if len(data) != len(shardList) {
log.Error("Binlog adapter: int16 field row count is not equal to primary key", zap.Int("dataLen", len(data)), zap.Int("shardLen", len(shardList)))
return errors.New("int16 field row count is not equal to primary key")
log.Error("Binlog adapter: int16 field row count is not equal to shard list row count", zap.Int("dataLen", len(data)), zap.Int("shardLen", len(shardList)))
return fmt.Errorf("int16 field row count %d is not equal to shard list row count %d", len(data), len(shardList))
}
// dispatch entities acoording to shard list
@ -829,8 +831,8 @@ func (p *BinlogAdapter) dispatchInt32ToShards(data []int32, memoryData []map[sto
shardList []int32, fieldID storage.FieldID) error {
// verify row count
if len(data) != len(shardList) {
log.Error("Binlog adapter: int32 field row count is not equal to primary key", zap.Int("dataLen", len(data)), zap.Int("shardLen", len(shardList)))
return errors.New("int32 field row count is not equal to primary key")
log.Error("Binlog adapter: int32 field row count is not equal to shard list row count", zap.Int("dataLen", len(data)), zap.Int("shardLen", len(shardList)))
return fmt.Errorf("int32 field row count %d is not equal to shard list row count %d", len(data), len(shardList))
}
// dispatch entities acoording to shard list
@ -853,8 +855,8 @@ func (p *BinlogAdapter) dispatchInt64ToShards(data []int64, memoryData []map[sto
shardList []int32, fieldID storage.FieldID) error {
// verify row count
if len(data) != len(shardList) {
log.Error("Binlog adapter: int64 field row count is not equal to primary key", zap.Int("dataLen", len(data)), zap.Int("shardLen", len(shardList)))
return errors.New("int64 field row count is not equal to primary key")
log.Error("Binlog adapter: int64 field row count is not equal to shard list row count", zap.Int("dataLen", len(data)), zap.Int("shardLen", len(shardList)))
return fmt.Errorf("int64 field row count %d is not equal to shard list row count %d", len(data), len(shardList))
}
// dispatch entities acoording to shard list
@ -877,8 +879,8 @@ func (p *BinlogAdapter) dispatchFloatToShards(data []float32, memoryData []map[s
shardList []int32, fieldID storage.FieldID) error {
// verify row count
if len(data) != len(shardList) {
log.Error("Binlog adapter: float field row count is not equal to primary key", zap.Int("dataLen", len(data)), zap.Int("shardLen", len(shardList)))
return errors.New("float field row count is not equal to primary key")
log.Error("Binlog adapter: float field row count is not equal to shard list row count", zap.Int("dataLen", len(data)), zap.Int("shardLen", len(shardList)))
return fmt.Errorf("float field row count %d is not equal to shard list row count %d", len(data), len(shardList))
}
// dispatch entities acoording to shard list
@ -901,8 +903,8 @@ func (p *BinlogAdapter) dispatchDoubleToShards(data []float64, memoryData []map[
shardList []int32, fieldID storage.FieldID) error {
// verify row count
if len(data) != len(shardList) {
log.Error("Binlog adapter: double field row count is not equal to primary key", zap.Int("dataLen", len(data)), zap.Int("shardLen", len(shardList)))
return errors.New("double field row count is not equal to primary key")
log.Error("Binlog adapter: double field row count is not equal to shard list row count", zap.Int("dataLen", len(data)), zap.Int("shardLen", len(shardList)))
return fmt.Errorf("double field row count %d is not equal to shard list row count %d", len(data), len(shardList))
}
// dispatch entities acoording to shard list
@ -925,8 +927,8 @@ func (p *BinlogAdapter) dispatchVarcharToShards(data []string, memoryData []map[
shardList []int32, fieldID storage.FieldID) error {
// verify row count
if len(data) != len(shardList) {
log.Error("Binlog adapter: varchar field row count is not equal to primary key", zap.Int("dataLen", len(data)), zap.Int("shardLen", len(shardList)))
return errors.New("varchar field row count is not equal to primary key")
log.Error("Binlog adapter: varchar field row count is not equal to shard list row count", zap.Int("dataLen", len(data)), zap.Int("shardLen", len(shardList)))
return fmt.Errorf("varchar field row count %d is not equal to shard list row count %d", len(data), len(shardList))
}
// dispatch entities acoording to shard list
@ -951,8 +953,9 @@ func (p *BinlogAdapter) dispatchBinaryVecToShards(data []byte, dim int, memoryDa
bytesPerVector := dim / 8
count := len(data) / bytesPerVector
if count != len(shardList) {
log.Error("Binlog adapter: binary vector field row count is not equal to primary key", zap.Int("dataLen", count), zap.Int("shardLen", len(shardList)))
return errors.New("binary vector field row count is not equal to primary key")
log.Error("Binlog adapter: binary vector field row count is not equal to shard list row count",
zap.Int("dataLen", count), zap.Int("shardLen", len(shardList)))
return fmt.Errorf("binary vector field row count %d is not equal to shard list row count %d", len(data), len(shardList))
}
// dispatch entities acoording to shard list
@ -966,12 +969,14 @@ func (p *BinlogAdapter) dispatchBinaryVecToShards(data []byte, dim int, memoryDa
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
binVecField := field.(*storage.BinaryVectorFieldData)
if binVecField == nil {
log.Error("Binlog adapter: the in-memory field is not a binary vector field")
return errors.New("the in-memory field is not a binary vector field")
log.Error("Binlog adapter: the in-memory field is not a binary vector field",
zap.Int64("fieldID", fieldID), zap.Int32("shardID", shardID))
return fmt.Errorf("the in-memory field is not a binary vector field, field id: %d", fieldID)
}
if binVecField.Dim != dim {
log.Error("Binlog adapter: binary vector dimension mismatch", zap.Int("sourceDim", dim), zap.Int("schemaDim", binVecField.Dim))
return errors.New("binary vector dimension mismatch")
log.Error("Binlog adapter: binary vector dimension mismatch",
zap.Int("sourceDim", dim), zap.Int("schemaDim", binVecField.Dim))
return fmt.Errorf("binary vector dimension %d is not equal to schema dimension %d", dim, binVecField.Dim)
}
for j := 0; j < bytesPerVector; j++ {
val := data[bytesPerVector*i+j]
@ -989,8 +994,9 @@ func (p *BinlogAdapter) dispatchFloatVecToShards(data []float32, dim int, memory
// verify row count
count := len(data) / dim
if count != len(shardList) {
log.Error("Binlog adapter: float vector field row count is not equal to primary key", zap.Int("dataLen", count), zap.Int("shardLen", len(shardList)))
return errors.New("float vector field row count is not equal to primary key")
log.Error("Binlog adapter: float vector field row count is not equal to shard list row count",
zap.Int("dataLen", count), zap.Int("shardLen", len(shardList)))
return fmt.Errorf("float vector field row count %d is not equal to shard list row count %d", len(data), len(shardList))
}
// dispatch entities acoording to shard list
@ -1004,12 +1010,14 @@ func (p *BinlogAdapter) dispatchFloatVecToShards(data []float32, dim int, memory
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
floatVecField := field.(*storage.FloatVectorFieldData)
if floatVecField == nil {
log.Error("Binlog adapter: the in-memory field is not a float vector field")
return errors.New("the in-memory field is not a float vector field")
log.Error("Binlog adapter: the in-memory field is not a float vector field",
zap.Int64("fieldID", fieldID), zap.Int32("shardID", shardID))
return fmt.Errorf("the in-memory field is not a float vector field, field id: %d", fieldID)
}
if floatVecField.Dim != dim {
log.Error("Binlog adapter: float vector dimension mismatch", zap.Int("sourceDim", dim), zap.Int("schemaDim", floatVecField.Dim))
return errors.New("float vector dimension mismatch")
log.Error("Binlog adapter: float vector dimension mismatch",
zap.Int("sourceDim", dim), zap.Int("schemaDim", floatVecField.Dim))
return fmt.Errorf("binary vector dimension %d is not equal to schema dimension %d", dim, floatVecField.Dim)
}
for j := 0; j < dim; j++ {
val := data[dim*i+j]

View File

@ -181,6 +181,12 @@ func Test_NewBinlogAdapter(t *testing.T) {
assert.NotNil(t, adapter)
assert.Nil(t, err)
// amend blockSize, blockSize should less than MaxSegmentSizeInMemory
adapter, err = NewBinlogAdapter(ctx, sampleSchema(), 2, MaxSegmentSizeInMemory+1, 1024, &MockChunkManager{}, flushFunc, 0, math.MaxUint64)
assert.NotNil(t, adapter)
assert.Nil(t, err)
assert.Equal(t, int64(MaxSegmentSizeInMemory), adapter.blockSize)
// no primary key
schema := &schemapb.CollectionSchema{
Name: "schema",

View File

@ -19,6 +19,7 @@ package importutil
import (
"context"
"errors"
"fmt"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/log"
@ -59,13 +60,13 @@ func (p *BinlogFile) Open(filePath string) error {
bytes, err := p.chunkManager.Read(context.TODO(), filePath)
if err != nil {
log.Error("Binlog file: failed to open binlog", zap.String("filePath", filePath), zap.Error(err))
return err
return fmt.Errorf("failed to open binlog %s", filePath)
}
p.reader, err = storage.NewBinlogReader(bytes)
if err != nil {
log.Error("Binlog file: failed to initialize binlog reader", zap.String("filePath", filePath), zap.Error(err))
return err
return fmt.Errorf("failed to initialize binlog reader for binlog %s, error: %w", filePath, err)
}
log.Info("Binlog file: open binlog successfully", zap.String("filePath", filePath))
@ -101,7 +102,7 @@ func (p *BinlogFile) ReadBool() ([]bool, error) {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
// end of the file
@ -122,7 +123,7 @@ func (p *BinlogFile) ReadBool() ([]bool, error) {
data, err := event.PayloadReaderInterface.GetBoolFromPayload()
if err != nil {
log.Error("Binlog file: failed to read bool data", zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to read bool data, error: %w", err)
}
result = append(result, data...)
@ -144,7 +145,7 @@ func (p *BinlogFile) ReadInt8() ([]int8, error) {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
// end of the file
@ -165,7 +166,7 @@ func (p *BinlogFile) ReadInt8() ([]int8, error) {
data, err := event.PayloadReaderInterface.GetInt8FromPayload()
if err != nil {
log.Error("Binlog file: failed to read int8 data", zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to read int8 data, error: %w", err)
}
result = append(result, data...)
@ -187,7 +188,7 @@ func (p *BinlogFile) ReadInt16() ([]int16, error) {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
// end of the file
@ -208,7 +209,7 @@ func (p *BinlogFile) ReadInt16() ([]int16, error) {
data, err := event.PayloadReaderInterface.GetInt16FromPayload()
if err != nil {
log.Error("Binlog file: failed to read int16 data", zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to read int16 data, error: %w", err)
}
result = append(result, data...)
@ -230,7 +231,7 @@ func (p *BinlogFile) ReadInt32() ([]int32, error) {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
// end of the file
@ -251,7 +252,7 @@ func (p *BinlogFile) ReadInt32() ([]int32, error) {
data, err := event.PayloadReaderInterface.GetInt32FromPayload()
if err != nil {
log.Error("Binlog file: failed to read int32 data", zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to read int32 data, error: %w", err)
}
result = append(result, data...)
@ -273,7 +274,7 @@ func (p *BinlogFile) ReadInt64() ([]int64, error) {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
// end of the file
@ -294,7 +295,7 @@ func (p *BinlogFile) ReadInt64() ([]int64, error) {
data, err := event.PayloadReaderInterface.GetInt64FromPayload()
if err != nil {
log.Error("Binlog file: failed to read int64 data", zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to read int64 data, error: %w", err)
}
result = append(result, data...)
@ -316,7 +317,7 @@ func (p *BinlogFile) ReadFloat() ([]float32, error) {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
// end of the file
@ -337,7 +338,7 @@ func (p *BinlogFile) ReadFloat() ([]float32, error) {
data, err := event.PayloadReaderInterface.GetFloatFromPayload()
if err != nil {
log.Error("Binlog file: failed to read float data", zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to read float data, error: %w", err)
}
result = append(result, data...)
@ -359,7 +360,7 @@ func (p *BinlogFile) ReadDouble() ([]float64, error) {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
// end of the file
@ -380,7 +381,7 @@ func (p *BinlogFile) ReadDouble() ([]float64, error) {
data, err := event.PayloadReaderInterface.GetDoubleFromPayload()
if err != nil {
log.Error("Binlog file: failed to read double data", zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to read double data, error: %w", err)
}
result = append(result, data...)
@ -402,7 +403,7 @@ func (p *BinlogFile) ReadVarchar() ([]string, error) {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
// end of the file
@ -424,7 +425,7 @@ func (p *BinlogFile) ReadVarchar() ([]string, error) {
data, err := event.PayloadReaderInterface.GetStringFromPayload()
if err != nil {
log.Error("Binlog file: failed to read varchar data", zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to read varchar data, error: %w", err)
}
result = append(result, data...)
@ -448,7 +449,7 @@ func (p *BinlogFile) ReadBinaryVector() ([]byte, int, error) {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, 0, err
return nil, 0, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
// end of the file
@ -469,7 +470,7 @@ func (p *BinlogFile) ReadBinaryVector() ([]byte, int, error) {
data, dimenson, err := event.PayloadReaderInterface.GetBinaryVectorFromPayload()
if err != nil {
log.Error("Binlog file: failed to read binary vector data", zap.Error(err))
return nil, 0, err
return nil, 0, fmt.Errorf("failed to read binary vector data, error: %w", err)
}
dim = dimenson
@ -494,7 +495,7 @@ func (p *BinlogFile) ReadFloatVector() ([]float32, int, error) {
event, err := p.reader.NextEventReader()
if err != nil {
log.Error("Binlog file: failed to iterate events reader", zap.Error(err))
return nil, 0, err
return nil, 0, fmt.Errorf("failed to iterate events reader, error: %w", err)
}
// end of the file
@ -515,7 +516,7 @@ func (p *BinlogFile) ReadFloatVector() ([]float32, int, error) {
data, dimension, err := event.PayloadReaderInterface.GetFloatVectorFromPayload()
if err != nil {
log.Error("Binlog file: failed to read float vector data", zap.Error(err))
return nil, 0, err
return nil, 0, fmt.Errorf("failed to read float vector data, error: %w", err)
}
dim = dimension

View File

@ -118,7 +118,7 @@ func (p *BinlogParser) constructSegmentHolders(insertlogRoot string, deltalogRoo
insertlogs, _, err := p.chunkManager.ListWithPrefix(context.TODO(), insertlogRoot, true)
if err != nil {
log.Error("Binlog parser: list insert logs error", zap.Error(err))
return nil, err
return nil, fmt.Errorf("failed to list insert logs with root path %s, error: %w", insertlogRoot, err)
}
// collect insert log paths
@ -129,16 +129,16 @@ func (p *BinlogParser) constructSegmentHolders(insertlogRoot string, deltalogRoo
fieldStrID := path.Base(fieldPath)
fieldID, err := strconv.ParseInt(fieldStrID, 10, 64)
if err != nil {
log.Error("Binlog parser: parse field id error", zap.String("fieldPath", fieldPath), zap.Error(err))
return nil, err
log.Error("Binlog parser: failed to parse field id", zap.String("fieldPath", fieldPath), zap.Error(err))
return nil, fmt.Errorf("failed to parse field id from insert log path %s, error: %w", insertlog, err)
}
segmentPath := path.Dir(fieldPath)
segmentStrID := path.Base(segmentPath)
segmentID, err := strconv.ParseInt(segmentStrID, 10, 64)
if err != nil {
log.Error("Binlog parser: parse segment id error", zap.String("segmentPath", segmentPath), zap.Error(err))
return nil, err
log.Error("Binlog parser: failed to parse segment id", zap.String("segmentPath", segmentPath), zap.Error(err))
return nil, fmt.Errorf("failed to parse segment id from insert log path %s, error: %w", insertlog, err)
}
holder, ok := holders[segmentID]
@ -176,8 +176,8 @@ func (p *BinlogParser) constructSegmentHolders(insertlogRoot string, deltalogRoo
// TODO add context
deltalogs, _, err := p.chunkManager.ListWithPrefix(context.TODO(), deltalogRoot, true)
if err != nil {
log.Error("Binlog parser: list delta logs error", zap.Error(err))
return nil, err
log.Error("Binlog parser: failed to list delta logs", zap.Error(err))
return nil, fmt.Errorf("failed to list delta logs, error: %w", err)
}
log.Info("Binlog parser: list delta logs", zap.Int("logsCount", len(deltalogs)))
@ -187,8 +187,8 @@ func (p *BinlogParser) constructSegmentHolders(insertlogRoot string, deltalogRoo
segmentStrID := path.Base(segmentPath)
segmentID, err := strconv.ParseInt(segmentStrID, 10, 64)
if err != nil {
log.Error("Binlog parser: parse segment id error", zap.String("segmentPath", segmentPath), zap.Error(err))
return nil, err
log.Error("Binlog parser: failed to parse segment id", zap.String("segmentPath", segmentPath), zap.Error(err))
return nil, fmt.Errorf("failed to parse segment id from delta log path %s, error: %w", deltalog, err)
}
// if the segment id doesn't exist, no need to process this deltalog
@ -219,7 +219,7 @@ func (p *BinlogParser) parseSegmentFiles(segmentHolder *SegmentFilesHolder) erro
MaxTotalSizeInMemory, p.chunkManager, p.callFlushFunc, p.tsStartPoint, p.tsEndPoint)
if err != nil {
log.Error("Binlog parser: failed to create binlog adapter", zap.Error(err))
return err
return fmt.Errorf("failed to create binlog adapter, error: %w", err)
}
return adapter.Read(segmentHolder)
@ -230,8 +230,8 @@ func (p *BinlogParser) parseSegmentFiles(segmentHolder *SegmentFilesHolder) erro
// 2. the delta log path of a partiion (optional)
func (p *BinlogParser) Parse(filePaths []string) error {
if len(filePaths) != 1 && len(filePaths) != 2 {
log.Error("Binlog parser: illegal paths for binlog import")
return errors.New("illegal paths for binlog import, partition binlog path and partition delta path are required")
log.Error("Binlog parser: illegal paths for binlog import, partition binlog path and delta path are required")
return errors.New("illegal paths for binlog import, partition binlog path and delta path are required")
}
insertlogPath := filePaths[0]

View File

@ -112,7 +112,7 @@ func initSegmentData(collectionSchema *schemapb.CollectionSchema) map[storage.Fi
NumRows: []int64{0},
}
default:
log.Error("Import util: unsupported data type", zap.Int("DataType", int(schema.DataType)))
log.Error("Import util: unsupported data type", zap.String("DataType", getTypeName(schema.DataType)))
return nil
}
}
@ -132,9 +132,7 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
case float64:
return nil
default:
s := fmt.Sprintf("%v", obj)
msg := "illegal numeric value " + s
return errors.New(msg)
return fmt.Errorf("illegal numeric value %v", obj)
}
}
@ -155,9 +153,7 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
case bool:
return nil
default:
s := fmt.Sprintf("%v", obj)
msg := "illegal value " + s + " for bool type field " + schema.GetName()
return errors.New(msg)
return fmt.Errorf("illegal value %v for bool type field '%s'", obj, schema.GetName())
}
}
@ -226,26 +222,21 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
switch vt := obj.(type) {
case []interface{}:
if len(vt)*8 != dim {
msg := "bit size " + strconv.Itoa(len(vt)*8) + " doesn't equal to vector dimension " + strconv.Itoa(dim) + " of field " + schema.GetName()
return errors.New(msg)
return fmt.Errorf("bit size %d doesn't equal to vector dimension %d of field '%s'", len(vt)*8, dim, schema.GetName())
}
for i := 0; i < len(vt); i++ {
if e := numericValidator(vt[i]); e != nil {
msg := e.Error() + " for binary vector field " + schema.GetName()
return errors.New(msg)
return fmt.Errorf("%s for binary vector field '%s'", e.Error(), schema.GetName())
}
t := int(vt[i].(float64))
if t > 255 || t < 0 {
msg := "illegal value " + strconv.Itoa(t) + " for binary vector field " + schema.GetName()
return errors.New(msg)
return fmt.Errorf("illegal value %d for binary vector field '%s'", t, schema.GetName())
}
}
return nil
default:
s := fmt.Sprintf("%v", obj)
msg := s + " is not an array for binary vector field " + schema.GetName()
return errors.New(msg)
return fmt.Errorf("%v is not an array for binary vector field '%s'", obj, schema.GetName())
}
}
@ -270,20 +261,16 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
switch vt := obj.(type) {
case []interface{}:
if len(vt) != dim {
msg := "array size " + strconv.Itoa(len(vt)) + " doesn't equal to vector dimension " + strconv.Itoa(dim) + " of field " + schema.GetName()
return errors.New(msg)
return fmt.Errorf("array size %d doesn't equal to vector dimension %d of field '%s'", len(vt), dim, schema.GetName())
}
for i := 0; i < len(vt); i++ {
if e := numericValidator(vt[i]); e != nil {
msg := e.Error() + " for float vector field " + schema.GetName()
return errors.New(msg)
return fmt.Errorf("%s for float vector field '%s'", e.Error(), schema.GetName())
}
}
return nil
default:
s := fmt.Sprintf("%v", obj)
msg := s + " is not an array for float vector field " + schema.GetName()
return errors.New(msg)
return fmt.Errorf("%v is not an array for float vector field '%s'", obj, schema.GetName())
}
}
@ -303,9 +290,7 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
case string:
return nil
default:
s := fmt.Sprintf("%v", obj)
msg := s + " is not a string for string type field " + schema.GetName()
return errors.New(msg)
return fmt.Errorf("%v is not a string for string type field '%s'", obj, schema.GetName())
}
}
@ -316,7 +301,7 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
return nil
}
default:
return errors.New("unsupport data type: " + strconv.Itoa(int(collectionSchema.Fields[i].DataType)))
return fmt.Errorf("unsupport data type: %s", getTypeName(collectionSchema.Fields[i].DataType))
}
}
@ -351,13 +336,13 @@ func getFieldDimension(schema *schemapb.FieldSchema) (int, error) {
if key == "dim" {
dim, err := strconv.Atoi(value)
if err != nil {
return 0, errors.New("vector dimension is invalid")
return 0, fmt.Errorf("illegal vector dimension '%s' for field '%s', error: %w", value, schema.GetName(), err)
}
return dim, nil
}
}
return 0, errors.New("vector dimension is not defined")
return 0, fmt.Errorf("vector dimension is not defined for field '%s'", schema.GetName())
}
// triggerGC triggers golang gc to return all free memory back to the underlying system at once,
@ -403,15 +388,15 @@ func tryFlushBlocks(ctx context.Context,
printFieldsDataInfo(blockData, "import util: prepare to force flush a block", nil)
err := callFlushFunc(blockData, i)
if err != nil {
log.Error("Import util: failed to force flush block data", zap.Int("shardID", i))
return err
log.Error("Import util: failed to force flush block data", zap.Int("shardID", i), zap.Error(err))
return fmt.Errorf("failed to force flush block data for shard id %d, error: %w", i, err)
}
log.Info("Import util: force flush", zap.Int("rowCount", rowCount), zap.Int("size", size), zap.Int("shardID", i))
blocksData[i] = initSegmentData(collectionSchema)
if blocksData[i] == nil {
log.Error("Import util: failed to initialize FieldData list")
return errors.New("failed to initialize FieldData list")
log.Error("Import util: failed to initialize FieldData list", zap.Int("shardID", i))
return fmt.Errorf("failed to initialize FieldData list for shard id %d", i)
}
continue
}
@ -422,16 +407,16 @@ func tryFlushBlocks(ctx context.Context,
printFieldsDataInfo(blockData, "import util: prepare to flush block larger than blockSize", nil)
err := callFlushFunc(blockData, i)
if err != nil {
log.Error("Import util: failed to flush block data", zap.Int("shardID", i))
return err
log.Error("Import util: failed to flush block data", zap.Int("shardID", i), zap.Error(err))
return fmt.Errorf("failed to flush block data for shard id %d, error: %w", i, err)
}
log.Info("Import util: block size exceed limit and flush", zap.Int("rowCount", rowCount),
zap.Int("size", size), zap.Int("shardID", i), zap.Int64("blockSize", blockSize))
blocksData[i] = initSegmentData(collectionSchema)
if blocksData[i] == nil {
log.Error("Import util: failed to initialize FieldData list")
return errors.New("failed to initialize FieldData list")
log.Error("Import util: failed to initialize FieldData list", zap.Int("shardID", i))
return fmt.Errorf("failed to initialize FieldData list for shard id %d", i)
}
continue
}
@ -467,15 +452,15 @@ func tryFlushBlocks(ctx context.Context,
err := callFlushFunc(blockData, biggestItem)
if err != nil {
log.Error("Import util: failed to flush biggest block data", zap.Int("shardID", biggestItem))
return err
return fmt.Errorf("failed to flush biggest block data for shard id %d, error: %w", biggestItem, err)
}
log.Info("Import util: total size exceed limit and flush", zap.Int("rowCount", rowCount),
zap.Int("size", size), zap.Int("totalSize", totalSize), zap.Int("shardID", biggestItem))
blocksData[biggestItem] = initSegmentData(collectionSchema)
if blocksData[biggestItem] == nil {
log.Error("Import util: failed to initialize FieldData list")
return errors.New("failed to initialize FieldData list")
log.Error("Import util: failed to initialize FieldData list", zap.Int("shardID", biggestItem))
return fmt.Errorf("failed to initialize FieldData list for shard id %d", biggestItem)
}
}
}

View File

@ -17,6 +17,7 @@ package importutil
import (
"context"
"errors"
"testing"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
@ -201,6 +202,29 @@ func Test_InitSegmentData(t *testing.T) {
}
testFunc(sampleSchema())
testFunc(strKeySchema())
// unsupported data type
schema := &schemapb.CollectionSchema{
Name: "schema",
AutoID: true,
Fields: []*schemapb.FieldSchema{
{
FieldID: 101,
Name: "uid",
IsPrimaryKey: true,
AutoID: true,
DataType: schemapb.DataType_Int64,
},
{
FieldID: 102,
Name: "flag",
IsPrimaryKey: false,
DataType: schemapb.DataType_None,
},
},
}
data := initSegmentData(schema)
assert.Nil(t, data)
}
func Test_InitValidators(t *testing.T) {
@ -218,6 +242,9 @@ func Test_InitValidators(t *testing.T) {
name2ID[field.GetName()] = field.GetFieldID()
}
fields := initSegmentData(schema)
assert.NotNil(t, fields)
checkFunc := func(funcName string, validVal interface{}, invalidVal interface{}) {
id := name2ID[funcName]
v, ok := validators[id]
@ -226,6 +253,13 @@ func Test_InitValidators(t *testing.T) {
assert.Nil(t, err)
err = v.validateFunc(invalidVal)
assert.NotNil(t, err)
fieldData := fields[id]
preNum := fieldData.RowNum()
err = v.convertFunc(validVal, fieldData)
assert.Nil(t, err)
postNum := fieldData.RowNum()
assert.Equal(t, 1, postNum-preNum)
}
// validate functions
@ -279,7 +313,6 @@ func Test_InitValidators(t *testing.T) {
FieldID: 111,
Name: "field_float_vector",
IsPrimaryKey: false,
Description: "float_vector",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: "dim", Value: "aa"},
@ -295,8 +328,7 @@ func Test_InitValidators(t *testing.T) {
FieldID: 110,
Name: "field_binary_vector",
IsPrimaryKey: false,
Description: "float_vector",
DataType: schemapb.DataType_FloatVector,
DataType: schemapb.DataType_BinaryVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: "dim", Value: "aa"},
},
@ -304,6 +336,18 @@ func Test_InitValidators(t *testing.T) {
err = initValidators(schema, validators)
assert.NotNil(t, err)
// unsupported data type
schema.Fields = make([]*schemapb.FieldSchema, 0)
schema.Fields = append(schema.Fields, &schemapb.FieldSchema{
FieldID: 110,
Name: "dummy",
IsPrimaryKey: false,
DataType: schemapb.DataType_None,
})
err = initValidators(schema, validators)
assert.NotNil(t, err)
}
func Test_GetFileNameAndExt(t *testing.T) {
@ -421,6 +465,18 @@ func Test_TryFlushBlocks(t *testing.T) {
assert.Equal(t, 2, flushCounter)
assert.Equal(t, 20, flushRowCount)
// call flush function failed
flushFunc = func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
return errors.New("error")
}
segmentsData = createSegmentsData(fieldsData, shardNum)
err = tryFlushBlocks(ctx, segmentsData, sampleSchema(), flushFunc, blockSize, maxTotalSize, true) // failed to force flush
assert.Error(t, err)
err = tryFlushBlocks(ctx, segmentsData, sampleSchema(), flushFunc, 1, maxTotalSize, false) // failed to flush block larger than blockSize
assert.Error(t, err)
err = tryFlushBlocks(ctx, segmentsData, sampleSchema(), flushFunc, blockSize, maxTotalSize, false) // failed to flush biggest block
assert.Error(t, err)
// canceled
cancel()
flushCounter = 0

View File

@ -106,8 +106,9 @@ type ImportWrapper struct {
createBinlogsFunc CreateBinlogsFunc // function to create binlog for a segment
saveSegmentFunc SaveSegmentFunc // function to persist a segment
importResult *rootcoordpb.ImportResult // import result
reportFunc func(res *rootcoordpb.ImportResult) error // report import state to rootcoord
importResult *rootcoordpb.ImportResult // import result
reportFunc func(res *rootcoordpb.ImportResult) error // report import state to rootcoord
reportImportAttempts uint // attempts count if report function get error
workingSegments map[int]*WorkingSegment // a map shard id to working segments
}
@ -138,16 +139,17 @@ func NewImportWrapper(ctx context.Context, collectionSchema *schemapb.Collection
ctx, cancel := context.WithCancel(ctx)
wrapper := &ImportWrapper{
ctx: ctx,
cancel: cancel,
collectionSchema: realSchema,
shardNum: shardNum,
segmentSize: segmentSize,
rowIDAllocator: idAlloc,
chunkManager: cm,
importResult: importResult,
reportFunc: reportFunc,
workingSegments: make(map[int]*WorkingSegment),
ctx: ctx,
cancel: cancel,
collectionSchema: realSchema,
shardNum: shardNum,
segmentSize: segmentSize,
rowIDAllocator: idAlloc,
chunkManager: cm,
importResult: importResult,
reportFunc: reportFunc,
reportImportAttempts: ReportImportAttempts,
workingSegments: make(map[int]*WorkingSegment),
}
return wrapper
@ -156,17 +158,17 @@ func NewImportWrapper(ctx context.Context, collectionSchema *schemapb.Collection
func (p *ImportWrapper) SetCallbackFunctions(assignSegmentFunc AssignSegmentFunc, createBinlogsFunc CreateBinlogsFunc, saveSegmentFunc SaveSegmentFunc) error {
if assignSegmentFunc == nil {
log.Error("import wrapper: callback function AssignSegmentFunc is nil")
return fmt.Errorf("import wrapper: callback function AssignSegmentFunc is nil")
return fmt.Errorf("callback function AssignSegmentFunc is nil")
}
if createBinlogsFunc == nil {
log.Error("import wrapper: callback function CreateBinlogsFunc is nil")
return fmt.Errorf("import wrapper: callback function CreateBinlogsFunc is nil")
return fmt.Errorf("callback function CreateBinlogsFunc is nil")
}
if saveSegmentFunc == nil {
log.Error("import wrapper: callback function SaveSegmentFunc is nil")
return fmt.Errorf("import wrapper: callback function SaveSegmentFunc is nil")
return fmt.Errorf("callback function SaveSegmentFunc is nil")
}
p.assignSegmentFunc = assignSegmentFunc
@ -201,7 +203,7 @@ func (p *ImportWrapper) validateColumnBasedFiles(filePaths []string, collectionS
_, ok := requiredFieldNames[name]
if !ok {
log.Error("import wrapper: the file has no corresponding field in collection", zap.String("fieldName", name))
return fmt.Errorf("import wrapper: the file '%s' has no corresponding field in collection", filePath)
return fmt.Errorf("the file '%s' has no corresponding field in collection", filePath)
}
}
@ -210,7 +212,7 @@ func (p *ImportWrapper) validateColumnBasedFiles(filePaths []string, collectionS
_, ok := fileNames[name]
if !ok {
log.Error("import wrapper: there is no file corresponding to field", zap.String("fieldName", name))
return fmt.Errorf("import wrapper: there is no file corresponding to field '%s'", name)
return fmt.Errorf("there is no file corresponding to field '%s'", name)
}
}
@ -232,8 +234,8 @@ func (p *ImportWrapper) fileValidation(filePaths []string) (bool, error) {
// only allow json file or numpy file
if fileType != JSONFileExt && fileType != NumpyFileExt {
log.Error("import wrapper: unsupportted file type", zap.String("filePath", filePath))
return false, fmt.Errorf("import wrapper: unsupportted file type: '%s'", filePath)
log.Error("import wrapper: unsupported file type", zap.String("filePath", filePath))
return false, fmt.Errorf("unsupported file type: '%s'", filePath)
}
// we use the first file to determine row-based or column-based
@ -246,12 +248,12 @@ func (p *ImportWrapper) fileValidation(filePaths []string) (bool, error) {
if rowBased {
if fileType != JSONFileExt {
log.Error("import wrapper: unsupported file type for row-based mode", zap.String("filePath", filePath))
return rowBased, fmt.Errorf("import wrapper: unsupported file type for row-based mode: '%s'", filePath)
return rowBased, fmt.Errorf("unsupported file type for row-based mode: '%s'", filePath)
}
} else {
if fileType != NumpyFileExt {
log.Error("import wrapper: unsupported file type for column-based mode", zap.String("filePath", filePath))
return rowBased, fmt.Errorf("import wrapper: unsupported file type for column-based mode: '%s'", filePath)
return rowBased, fmt.Errorf("unsupported file type for column-based mode: '%s'", filePath)
}
}
@ -259,7 +261,7 @@ func (p *ImportWrapper) fileValidation(filePaths []string) (bool, error) {
_, ok := fileNames[name]
if ok {
log.Error("import wrapper: duplicate file name", zap.String("filePath", filePath))
return rowBased, fmt.Errorf("import wrapper: duplicate file: '%s'", filePath)
return rowBased, fmt.Errorf("duplicate file: '%s'", filePath)
}
fileNames[name] = struct{}{}
@ -267,19 +269,19 @@ func (p *ImportWrapper) fileValidation(filePaths []string) (bool, error) {
size, err := p.chunkManager.Size(p.ctx, filePath)
if err != nil {
log.Error("import wrapper: failed to get file size", zap.String("filePath", filePath), zap.Error(err))
return rowBased, fmt.Errorf("import wrapper: failed to get file size of '%s', make sure the input path is related path, error:%w", filePath, err)
return rowBased, fmt.Errorf("failed to get file size of '%s', error:%w", filePath, err)
}
// empty file
if size == 0 {
log.Error("import wrapper: file size is zero", zap.String("filePath", filePath))
return rowBased, fmt.Errorf("import wrapper: the file '%s' size is zero", filePath)
return rowBased, fmt.Errorf("the file '%s' size is zero", filePath)
}
if size > MaxFileSize {
log.Error("import wrapper: file size exceeds the maximum size", zap.String("filePath", filePath),
zap.Int64("fileSize", size), zap.Int64("MaxFileSize", MaxFileSize))
return rowBased, fmt.Errorf("import wrapper: the file '%s' size exceeds the maximum size: %d bytes", filePath, MaxFileSize)
return rowBased, fmt.Errorf("the file '%s' size exceeds the maximum size: %d bytes", filePath, MaxFileSize)
}
totalSize += size
}
@ -287,7 +289,7 @@ func (p *ImportWrapper) fileValidation(filePaths []string) (bool, error) {
// especially for column-base, total size of files cannot exceed MaxTotalSizeInMemory
if totalSize > MaxTotalSizeInMemory {
log.Error("import wrapper: total size of files exceeds the maximum size", zap.Int64("totalSize", totalSize), zap.Int64("MaxTotalSize", MaxTotalSizeInMemory))
return rowBased, fmt.Errorf("import wrapper: total size(%d bytes) of all files exceeds the maximum size: %d bytes", totalSize, MaxTotalSizeInMemory)
return rowBased, fmt.Errorf("total size(%d bytes) of all files exceeds the maximum size: %d bytes", totalSize, MaxTotalSizeInMemory)
}
// check redundant files for column-based import
@ -348,7 +350,7 @@ func (p *ImportWrapper) Import(filePaths []string, options ImportOptions) error
fieldsData := initSegmentData(p.collectionSchema)
if fieldsData == nil {
log.Error("import wrapper: failed to initialize FieldData list")
return fmt.Errorf("import wrapper: failed to initialize FieldData list")
return fmt.Errorf("failed to initialize FieldData list")
}
rowCount := 0
@ -384,7 +386,7 @@ func (p *ImportWrapper) Import(filePaths []string, options ImportOptions) error
// check the row count. only count non-zero row fields
if rowCount > 0 && rowCount != v.RowNum() {
return fmt.Errorf("the field %d row count %d doesn't equal others row count: %d", k, v.RowNum(), rowCount)
return fmt.Errorf("the field %d row count %d doesn't equal to others row count: %d", k, v.RowNum(), rowCount)
}
rowCount = v.RowNum()
@ -425,11 +427,11 @@ func (p *ImportWrapper) Import(filePaths []string, options ImportOptions) error
triggerGC()
}
return p.reportPersisted()
return p.reportPersisted(p.reportImportAttempts)
}
// reportPersisted notify the rootcoord to mark the task state to be ImportPersisted
func (p *ImportWrapper) reportPersisted() error {
func (p *ImportWrapper) reportPersisted(reportAttempts uint) error {
// force close all segments
err := p.closeAllWorkingSegments()
if err != nil {
@ -441,7 +443,7 @@ func (p *ImportWrapper) reportPersisted() error {
// persist state task is valuable, retry more times in case fail this task only because of network error
reportErr := retry.Do(p.ctx, func() error {
return p.reportFunc(p.importResult)
}, retry.Attempts(ReportImportAttempts))
}, retry.Attempts(reportAttempts))
if reportErr != nil {
log.Warn("import wrapper: fail to report import state to RootCoord", zap.Error(reportErr))
return reportErr
@ -520,7 +522,7 @@ func (p *ImportWrapper) doBinlogImport(filePaths []string, tsStartPoint uint64,
return err
}
return p.reportPersisted()
return p.reportPersisted(p.reportImportAttempts)
}
// parseRowBasedJSON is the entry of row-based json import operation
@ -700,7 +702,7 @@ func (p *ImportWrapper) appendFunc(schema *schemapb.FieldSchema) func(src storag
func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.FieldData, blockSize int64) error {
if len(fieldsData) == 0 {
log.Error("import wrapper: fields data is empty")
return fmt.Errorf("import wrapper: fields data is empty")
return fmt.Errorf("fields data is empty")
}
tr := timerecord.NewTimeRecorder("import wrapper: split field data")
@ -722,7 +724,7 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F
v, ok := fieldsData[schema.GetFieldID()]
if !ok {
log.Error("import wrapper: field not provided", zap.String("fieldName", schema.GetName()))
return fmt.Errorf("import wrapper: field '%s' not provided", schema.GetName())
return fmt.Errorf("field '%s' not provided", schema.GetName())
}
rowCounter[schema.GetName()] = v.RowNum()
if v.RowNum() > rowCount {
@ -732,14 +734,14 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F
}
if primaryKey == nil {
log.Error("import wrapper: primary key field is not found")
return fmt.Errorf("import wrapper: primary key field is not found")
return fmt.Errorf("primary key field is not found")
}
for name, count := range rowCounter {
if count != rowCount {
log.Error("import wrapper: field row count is not equal to other fields row count", zap.String("fieldName", name),
zap.Int("rowCount", count), zap.Int("otherRowCount", rowCount))
return fmt.Errorf("import wrapper: field '%s' row count %d is not equal to other fields row count: %d", name, count, rowCount)
return fmt.Errorf("field '%s' row count %d is not equal to other fields row count: %d", name, count, rowCount)
}
}
log.Info("import wrapper: try to split a block with row count", zap.Int("rowCount", rowCount))
@ -747,14 +749,14 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F
primaryData, ok := fieldsData[primaryKey.GetFieldID()]
if !ok {
log.Error("import wrapper: primary key field is not provided", zap.String("keyName", primaryKey.GetName()))
return fmt.Errorf("import wrapper: primary key field is not provided")
return fmt.Errorf("primary key field is not provided")
}
// generate auto id for primary key and rowid field
rowIDBegin, rowIDEnd, err := p.rowIDAllocator.Alloc(uint32(rowCount))
if err != nil {
log.Error("import wrapper: failed to alloc row ID", zap.Error(err))
return err
return fmt.Errorf("failed to alloc row ID, error: %w", err)
}
rowIDField := fieldsData[common.RowIDField]
@ -781,8 +783,8 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F
}
if primaryData.RowNum() <= 0 {
log.Error("import wrapper: primary key not provided", zap.String("keyName", primaryKey.GetName()))
return fmt.Errorf("import wrapper: the primary key '%s' not provided", primaryKey.GetName())
log.Error("import wrapper: primary key is not provided", zap.String("keyName", primaryKey.GetName()))
return fmt.Errorf("the primary key '%s' is not provided", primaryKey.GetName())
}
// prepare segemnts
@ -791,7 +793,7 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F
segmentData := initSegmentData(p.collectionSchema)
if segmentData == nil {
log.Error("import wrapper: failed to initialize FieldData list")
return fmt.Errorf("import wrapper: failed to initialize FieldData list")
return fmt.Errorf("failed to initialize FieldData list")
}
segmentsData = append(segmentsData, segmentData)
}
@ -803,7 +805,7 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F
appendFuncErr := p.appendFunc(schema)
if appendFuncErr == nil {
log.Error("import wrapper: unsupported field data type")
return fmt.Errorf("import wrapper: unsupported field data type")
return fmt.Errorf("unsupported field data type: %d", schema.GetDataType())
}
appendFunctions[schema.GetName()] = appendFuncErr
}
@ -821,7 +823,7 @@ func (p *ImportWrapper) splitFieldsData(fieldsData map[storage.FieldID]storage.F
intPK, ok := interface{}(pk).(int64)
if !ok {
log.Error("import wrapper: primary key field must be int64 or varchar")
return fmt.Errorf("import wrapper: primary key field must be int64 or varchar")
return fmt.Errorf("primary key field must be int64 or varchar")
}
hash, _ := typeutil.Hash32Int64(intPK)
shard = hash % uint32(p.shardNum)
@ -891,7 +893,7 @@ func (p *ImportWrapper) flushFunc(fields map[storage.FieldID]storage.FieldData,
segID, channelName, err := p.assignSegmentFunc(shardID)
if err != nil {
log.Error("import wrapper: failed to assign a new segment", zap.Error(err), zap.Int("shardID", shardID))
return err
return fmt.Errorf("failed to assign a new segment for shard id %d, error: %w", shardID, err)
}
segment = &WorkingSegment{
@ -911,7 +913,8 @@ func (p *ImportWrapper) flushFunc(fields map[storage.FieldID]storage.FieldData,
if err != nil {
log.Error("import wrapper: failed to save binlogs", zap.Error(err), zap.Int("shardID", shardID),
zap.Int64("segmentID", segment.segmentID), zap.String("targetChannel", segment.targetChName))
return err
return fmt.Errorf("failed to save binlogs, shard id %d, segment id %d, channel '%s', error: %w",
shardID, segment.segmentID, segment.targetChName, err)
}
segment.fieldsInsert = append(segment.fieldsInsert, fieldsInsert...)
@ -934,12 +937,13 @@ func (p *ImportWrapper) closeWorkingSegment(segment *WorkingSegment) error {
err := p.saveSegmentFunc(segment.fieldsInsert, segment.fieldsStats, segment.segmentID, segment.targetChName, segment.rowCount)
if err != nil {
log.Error("import wrapper: failed to save segment",
log.Error("import wrapper: failed to seal segment",
zap.Error(err),
zap.Int("shardID", segment.shardID),
zap.Int64("segmentID", segment.segmentID),
zap.String("targetChannel", segment.targetChName))
return err
return fmt.Errorf("failed to seal segment, shard id %d, segment id %d, channel '%s', error: %w",
segment.shardID, segment.segmentID, segment.targetChName, err)
}
return nil

View File

@ -429,28 +429,25 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) {
assert.Equal(t, 5, rowCounter.rowCount)
assert.Equal(t, commonpb.ImportState_ImportPersisted, importResult.State)
// parse error
content := []byte(`{
"field_bool": [true, false, true, true, true]
}`)
filePath := "rows_2.json"
// row count of fields not equal
filePath := "field_int8.npy"
content, err := CreateNumpyData([]int8{10})
assert.Nil(t, err)
err = cm.Write(ctx, filePath, content)
assert.NoError(t, err)
files[1] = filePath
importResult.State = commonpb.ImportState_ImportStarted
wrapper = NewImportWrapper(ctx, sampleSchema(), 2, 1, idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
files = make([]string, 0)
files = append(files, filePath)
err = wrapper.Import(files, DefaultImportOptions())
assert.NotNil(t, err)
assert.NotEqual(t, commonpb.ImportState_ImportPersisted, importResult.State)
// file doesn't exist
files = make([]string, 0)
files = append(files, "/dummy/dummy.json")
files = append(files, "/dummy/dummy.npy")
err = wrapper.Import(files, DefaultImportOptions())
assert.NotNil(t, err)
}
@ -819,6 +816,7 @@ func Test_ImportWrapperReportFailRowBased(t *testing.T) {
files := make([]string, 0)
files = append(files, filePath)
wrapper.reportImportAttempts = 2
wrapper.reportFunc = func(res *rootcoordpb.ImportResult) error {
return errors.New("mock error")
}
@ -865,6 +863,7 @@ func Test_ImportWrapperReportFailColumnBased_numpy(t *testing.T) {
wrapper := NewImportWrapper(ctx, schema, 2, 1, idAllocator, cm, importResult, reportFunc)
wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
wrapper.reportImportAttempts = 2
wrapper.reportFunc = func(res *rootcoordpb.ImportResult) error {
return errors.New("mock error")
}
@ -1131,3 +1130,51 @@ func Test_ImportWrapperSplitFieldsData(t *testing.T) {
assert.Equal(t, 2, rowCounter.callTime)
assert.Equal(t, rowCount, rowCounter.rowCount)
}
func Test_ImportWrapperReportPersisted(t *testing.T) {
ctx := context.Background()
importResult := &rootcoordpb.ImportResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
TaskId: 1,
DatanodeId: 1,
State: commonpb.ImportState_ImportStarted,
Segments: make([]int64, 0),
AutoIds: make([]int64, 0),
RowCount: 0,
}
reportFunc := func(res *rootcoordpb.ImportResult) error {
return nil
}
wrapper := NewImportWrapper(ctx, sampleSchema(), int32(2), int64(1024), nil, nil, importResult, reportFunc)
assert.NotNil(t, wrapper)
rowCounter := &rowCounterTest{}
assignSegmentFunc, flushFunc, saveSegmentFunc := createMockCallbackFunctions(t, rowCounter)
err := wrapper.SetCallbackFunctions(assignSegmentFunc, flushFunc, saveSegmentFunc)
assert.Nil(t, err)
// success
err = wrapper.reportPersisted(2)
assert.Nil(t, err)
// error when closing segments
wrapper.saveSegmentFunc = func(fieldsInsert []*datapb.FieldBinlog, fieldsStats []*datapb.FieldBinlog, segmentID int64, targetChName string, rowCount int64) error {
return errors.New("error")
}
wrapper.workingSegments[0] = &WorkingSegment{}
err = wrapper.reportPersisted(2)
assert.Error(t, err)
// failed to report
wrapper.saveSegmentFunc = func(fieldsInsert []*datapb.FieldBinlog, fieldsStats []*datapb.FieldBinlog, segmentID int64, targetChName string, rowCount int64) error {
return nil
}
wrapper.reportFunc = func(res *rootcoordpb.ImportResult) error {
return errors.New("error")
}
err = wrapper.reportPersisted(2)
assert.Error(t, err)
}

View File

@ -163,7 +163,7 @@ func NewJSONRowConsumer(collectionSchema *schemapb.CollectionSchema, idAlloc *al
err := initValidators(collectionSchema, v.validators)
if err != nil {
log.Error("JSON row consumer: fail to initialize json row-based consumer", zap.Error(err))
return nil, fmt.Errorf("fail to initialize json row-based consumer: %v", err)
return nil, fmt.Errorf("fail to initialize json row-based consumer, error: %w", err)
}
v.segmentsData = make([]map[storage.FieldID]storage.FieldData, 0, shardNum)
@ -171,7 +171,7 @@ func NewJSONRowConsumer(collectionSchema *schemapb.CollectionSchema, idAlloc *al
segmentData := initSegmentData(collectionSchema)
if segmentData == nil {
log.Error("JSON row consumer: fail to initialize in-memory segment data", zap.Int("shardID", i))
return nil, fmt.Errorf("fail to initialize in-memory segment data for shardID %d", i)
return nil, fmt.Errorf("fail to initialize in-memory segment data for shard id %d", i)
}
v.segmentsData = append(v.segmentsData, segmentData)
}
@ -267,7 +267,7 @@ func (v *JSONRowConsumer) Handle(rows []map[storage.FieldID]interface{}) error {
err := v.flush(false)
if err != nil {
log.Error("JSON row consumer: try flush data but failed", zap.Error(err))
return fmt.Errorf("try flush data but failed: %s", err.Error())
return fmt.Errorf("try flush data but failed, error: %w", err)
}
// prepare autoid, no matter int64 or varchar pk, we always generate autoid since the hidden field RowIDField requires them
@ -279,7 +279,7 @@ func (v *JSONRowConsumer) Handle(rows []map[storage.FieldID]interface{}) error {
rowIDBegin, rowIDEnd, err = v.rowIDAllocator.Alloc(uint32(len(rows)))
if err != nil {
log.Error("JSON row consumer: failed to generate primary keys", zap.Int("count", len(rows)), zap.Error(err))
return fmt.Errorf("failed to generate %d primary keys: %s", len(rows), err.Error())
return fmt.Errorf("failed to generate %d primary keys, error: %w", len(rows), err)
}
if rowIDEnd-rowIDBegin != int64(len(rows)) {
log.Error("JSON row consumer: try to generate primary keys but allocated ids are not enough",
@ -326,7 +326,7 @@ func (v *JSONRowConsumer) Handle(rows []map[storage.FieldID]interface{}) error {
if err != nil {
log.Error("JSON row consumer: failed to hash primary key at the row",
zap.Int64("key", pk), zap.Int64("rowNumber", v.rowCounter+int64(i)), zap.Error(err))
return fmt.Errorf("failed to hash primary key %d at the row %d, error: %s", pk, v.rowCounter+int64(i), err.Error())
return fmt.Errorf("failed to hash primary key %d at the row %d, error: %w", pk, v.rowCounter+int64(i), err)
}
shard = hash % uint32(v.shardNum)
@ -349,8 +349,8 @@ func (v *JSONRowConsumer) Handle(rows []map[storage.FieldID]interface{}) error {
if err := validator.convertFunc(value, v.segmentsData[shard][name]); err != nil {
log.Error("JSON row consumer: failed to convert value for field at the row",
zap.String("fieldName", validator.fieldName), zap.Int64("rowNumber", v.rowCounter+int64(i)), zap.Error(err))
return fmt.Errorf("failed to convert value for field %s at the row %d, error: %s",
validator.fieldName, v.rowCounter+int64(i), err.Error())
return fmt.Errorf("failed to convert value for field '%s' at the row %d, error: %w",
validator.fieldName, v.rowCounter+int64(i), err)
}
}
}

View File

@ -301,6 +301,7 @@ func Test_JSONRowConsumerFlush(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, shardNum, callTime)
assert.Equal(t, rowCountEachShard*int(shardNum), totalCount)
assert.Equal(t, 0, len(consumer.IDRange())) // not auto-generated id, no id range
// execeed block size trigger flush
callTime = 0
@ -320,6 +321,7 @@ func Test_JSONRowConsumerFlush(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, shardNum/2, callTime)
assert.Equal(t, rowCountEachShard*int(shardNum)/2, totalCount)
assert.Equal(t, 0, len(consumer.IDRange())) // not auto-generated id, no id range
}
func Test_JSONRowConsumerHandle(t *testing.T) {
@ -346,6 +348,10 @@ func Test_JSONRowConsumerHandle(t *testing.T) {
},
}
var consumer *JSONRowConsumer
err := consumer.Handle(nil)
assert.Error(t, err)
t.Run("handle int64 pk", func(t *testing.T) {
consumer, err := NewJSONRowConsumer(schema, idAllocator, 1, 1, flushFunc)
assert.NotNil(t, consumer)

View File

@ -99,12 +99,12 @@ func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error {
t, err := dec.Token()
if err != nil {
log.Error("JSON parser: row count is 0")
return errors.New("JSON parser: row count is 0")
log.Error("JSON parser: failed to decode the JSON file", zap.Error(err))
return fmt.Errorf("failed to decode the JSON file, error: %w", err)
}
if t != json.Delim('{') {
log.Error("JSON parser: invalid JSON format, the content should be started with'{'")
return errors.New("JSON parser: invalid JSON format, the content should be started with'{'")
return errors.New("invalid JSON format, the content should be started with'{'")
}
// read the first level
@ -113,28 +113,28 @@ func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error {
// read the key
t, err := dec.Token()
if err != nil {
log.Error("JSON parser: read json token error", zap.Error(err))
return fmt.Errorf("JSON parser: read json token error: %v", err)
log.Error("JSON parser: failed to decode the JSON file", zap.Error(err))
return fmt.Errorf("failed to decode the JSON file, error: %w", err)
}
key := t.(string)
keyLower := strings.ToLower(key)
// the root key should be RowRootNode
if keyLower != RowRootNode {
log.Error("JSON parser: invalid row-based JSON format, the key is not found", zap.String("key", key))
return fmt.Errorf("JSON parser: invalid row-based JSON format, the key %s is not found", key)
log.Error("JSON parser: invalid JSON format, the root key is not found", zap.String("RowRootNode", RowRootNode), zap.String("key", key))
return fmt.Errorf("invalid JSON format, the root key should be '%s', but get '%s'", RowRootNode, key)
}
// started by '['
t, err = dec.Token()
if err != nil {
log.Error("JSON parser: read json token error", zap.Error(err))
return fmt.Errorf("JSON parser: read json token error: %v", err)
log.Error("JSON parser: failed to decode the JSON file", zap.Error(err))
return fmt.Errorf("failed to decode the JSON file, error: %w", err)
}
if t != json.Delim('[') {
log.Error("JSON parser: invalid row-based JSON format, rows list should begin with '['")
return errors.New("JSON parser: invalid row-based JSON format, rows list should begin with '['")
log.Error("JSON parser: invalid JSON format, rows list should begin with '['")
return errors.New("invalid JSON format, rows list should begin with '['")
}
// read buffer
@ -142,8 +142,8 @@ func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error {
for dec.More() {
var value interface{}
if err := dec.Decode(&value); err != nil {
log.Error("JSON parser: decode json value error", zap.Error(err))
return fmt.Errorf("JSON parser: decode json value error: %v", err)
log.Error("JSON parser: failed to parse row value", zap.Error(err))
return fmt.Errorf("failed to parse row value, error: %w", err)
}
switch value.(type) {
@ -151,7 +151,7 @@ func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error {
break
default:
log.Error("JSON parser: invalid JSON format, each row should be a key-value map")
return errors.New("JSON parser: invalid JSON format, each row should be a key-value map")
return errors.New("invalid JSON format, each row should be a key-value map")
}
row := make(map[storage.FieldID]interface{})
@ -161,7 +161,7 @@ func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error {
fieldID, ok := p.name2FieldID[k]
if !ok {
log.Error("JSON parser: the field is not defined in collection schema", zap.String("fieldName", k))
return fmt.Errorf("JSON parser: the field '%s' is not defined in collection schema", k)
return fmt.Errorf("the field '%s' is not defined in collection schema", k)
}
row[fieldID] = v
}
@ -170,8 +170,8 @@ func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error {
if len(buf) >= int(p.bufSize) {
isEmpty = false
if err = handler.Handle(buf); err != nil {
log.Error("JSON parser: parse values error", zap.Error(err))
return fmt.Errorf("JSON parser: parse values error: %v", err)
log.Error("JSON parser: failed to convert row value to entity", zap.Error(err))
return fmt.Errorf("failed to convert row value to entity, error: %w", err)
}
// clear the buffer
@ -183,27 +183,27 @@ func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error {
if len(buf) > 0 {
isEmpty = false
if err = handler.Handle(buf); err != nil {
log.Error("JSON parser: parse values error", zap.Error(err))
return fmt.Errorf("JSON parser: parse values error: %v", err)
log.Error("JSON parser: failed to convert row value to entity", zap.Error(err))
return fmt.Errorf("failed to convert row value to entity, error: %w", err)
}
}
// end by ']'
t, err = dec.Token()
if err != nil {
log.Error("JSON parser: read json token error", zap.Error(err))
return fmt.Errorf("JSON parser: read json token error: %v", err)
log.Error("JSON parser: failed to decode the JSON file", zap.Error(err))
return fmt.Errorf("failed to decode the JSON file, error: %w", err)
}
if t != json.Delim(']') {
log.Error("JSON parser: invalid column-based JSON format, rows list should end with a ']'")
return errors.New("JSON parser: invalid column-based JSON format, rows list should end with a ']'")
log.Error("JSON parser: invalid JSON format, rows list should end with a ']'")
return errors.New("invalid JSON format, rows list should end with a ']'")
}
// outside context might be canceled(service stop, or future enhancement for canceling import task)
if isCanceled(p.ctx) {
log.Error("JSON parser: import task was canceled")
return errors.New("JSON parser: import task was canceled")
return errors.New("import task was canceled")
}
// this break means we require the first node must be RowRootNode
@ -213,7 +213,7 @@ func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error {
if isEmpty {
log.Error("JSON parser: row count is 0")
return errors.New("JSON parser: row count is 0")
return errors.New("row count is 0")
}
// send nil to notify the handler all have done

View File

@ -136,8 +136,8 @@ func convertNumpyType(typeStr string) (schemapb.DataType, error) {
if isStringType(typeStr) {
return schemapb.DataType_VarChar, nil
}
log.Error("Numpy adapter: the numpy file data type is not supported", zap.String("dataType", typeStr))
return schemapb.DataType_None, fmt.Errorf("Numpy adapter: the numpy file dtype '%s' is not supported", typeStr)
log.Error("Numpy adapter: the numpy file data type is not supported", zap.String("dtype", typeStr))
return schemapb.DataType_None, fmt.Errorf("the numpy file dtype '%s' is not supported", typeStr)
}
}
@ -179,7 +179,8 @@ func stringLen(dtype string) (int, bool, error) {
return v, utf, nil
}
return 0, false, fmt.Errorf("Numpy adapter: data type '%s' of numpy file is not varchar data type", dtype)
log.Error("Numpy adapter: the numpy file dtype is not varchar data type", zap.String("dtype", dtype))
return 0, false, fmt.Errorf("dtype '%s' of numpy file is not varchar data type", dtype)
}
func isStringType(typeStr string) bool {
@ -251,25 +252,25 @@ func (n *NumpyAdapter) checkCount(count int) int {
func (n *NumpyAdapter) ReadBool(count int) ([]bool, error) {
if count <= 0 {
return nil, errors.New("Numpy adapter: cannot read bool data with a zero or nagative count")
return nil, errors.New("cannot read bool data with a zero or nagative count")
}
// incorrect type
if n.dataType != schemapb.DataType_Bool {
return nil, errors.New("Numpy adapter: numpy data is not bool type")
return nil, errors.New("numpy data is not bool type")
}
// avoid read overflow
readSize := n.checkCount(count)
if readSize <= 0 {
return nil, errors.New("Numpy adapter: end of bool file, nothing to read")
return nil, errors.New("end of bool file, nothing to read")
}
// read data
data := make([]bool, readSize)
err := binary.Read(n.reader, n.order, &data)
if err != nil {
return nil, fmt.Errorf("Numpy adapter: failed to read bool data with count %d, error: %w", readSize, err)
return nil, fmt.Errorf(" failed to read bool data with count %d, error: %w", readSize, err)
}
// update read position after successfully read
@ -280,7 +281,7 @@ func (n *NumpyAdapter) ReadBool(count int) ([]bool, error) {
func (n *NumpyAdapter) ReadUint8(count int) ([]uint8, error) {
if count <= 0 {
return nil, errors.New("Numpy adapter: cannot read uint8 data with a zero or nagative count")
return nil, errors.New("cannot read uint8 data with a zero or nagative count")
}
// incorrect type
@ -288,20 +289,20 @@ func (n *NumpyAdapter) ReadUint8(count int) ([]uint8, error) {
switch n.npyReader.Header.Descr.Type {
case "u1", "<u1", "|u1", "uint8":
default:
return nil, errors.New("Numpy adapter: numpy data is not uint8 type")
return nil, errors.New("numpy data is not uint8 type")
}
// avoid read overflow
readSize := n.checkCount(count)
if readSize <= 0 {
return nil, errors.New("Numpy adapter: end of uint8 file, nothing to read")
return nil, errors.New("end of uint8 file, nothing to read")
}
// read data
data := make([]uint8, readSize)
err := binary.Read(n.reader, n.order, &data)
if err != nil {
return nil, fmt.Errorf("Numpy adapter: failed to read uint8 data with count %d, error: %w", readSize, err)
return nil, fmt.Errorf("failed to read uint8 data with count %d, error: %w", readSize, err)
}
// update read position after successfully read
@ -312,25 +313,25 @@ func (n *NumpyAdapter) ReadUint8(count int) ([]uint8, error) {
func (n *NumpyAdapter) ReadInt8(count int) ([]int8, error) {
if count <= 0 {
return nil, errors.New("Numpy adapter: cannot read int8 data with a zero or nagative count")
return nil, errors.New("cannot read int8 data with a zero or nagative count")
}
// incorrect type
if n.dataType != schemapb.DataType_Int8 {
return nil, errors.New("Numpy adapter: numpy data is not int8 type")
return nil, errors.New("numpy data is not int8 type")
}
// avoid read overflow
readSize := n.checkCount(count)
if readSize <= 0 {
return nil, errors.New("Numpy adapter: end of int8 file, nothing to read")
return nil, errors.New("end of int8 file, nothing to read")
}
// read data
data := make([]int8, readSize)
err := binary.Read(n.reader, n.order, &data)
if err != nil {
return nil, fmt.Errorf("Numpy adapter: failed to read int8 data with count %d, error: %w", readSize, err)
return nil, fmt.Errorf("failed to read int8 data with count %d, error: %w", readSize, err)
}
// update read position after successfully read
@ -341,25 +342,25 @@ func (n *NumpyAdapter) ReadInt8(count int) ([]int8, error) {
func (n *NumpyAdapter) ReadInt16(count int) ([]int16, error) {
if count <= 0 {
return nil, errors.New("Numpy adapter: cannot read int16 data with a zero or nagative count")
return nil, errors.New("cannot read int16 data with a zero or nagative count")
}
// incorrect type
if n.dataType != schemapb.DataType_Int16 {
return nil, errors.New("Numpy adapter: numpy data is not int16 type")
return nil, errors.New("numpy data is not int16 type")
}
// avoid read overflow
readSize := n.checkCount(count)
if readSize <= 0 {
return nil, errors.New("Numpy adapter: end of int16 file, nothing to read")
return nil, errors.New("end of int16 file, nothing to read")
}
// read data
data := make([]int16, readSize)
err := binary.Read(n.reader, n.order, &data)
if err != nil {
return nil, fmt.Errorf("Numpy adapter: failed to read int16 data with count %d, error: %w", readSize, err)
return nil, fmt.Errorf("failed to read int16 data with count %d, error: %w", readSize, err)
}
// update read position after successfully read
@ -370,25 +371,25 @@ func (n *NumpyAdapter) ReadInt16(count int) ([]int16, error) {
func (n *NumpyAdapter) ReadInt32(count int) ([]int32, error) {
if count <= 0 {
return nil, errors.New("Numpy adapter: cannot read int32 data with a zero or nagative count")
return nil, errors.New("cannot read int32 data with a zero or nagative count")
}
// incorrect type
if n.dataType != schemapb.DataType_Int32 {
return nil, errors.New("Numpy adapter: numpy data is not int32 type")
return nil, errors.New("numpy data is not int32 type")
}
// avoid read overflow
readSize := n.checkCount(count)
if readSize <= 0 {
return nil, errors.New("Numpy adapter: end of int32 file, nothing to read")
return nil, errors.New("end of int32 file, nothing to read")
}
// read data
data := make([]int32, readSize)
err := binary.Read(n.reader, n.order, &data)
if err != nil {
return nil, fmt.Errorf("Numpy adapter: failed to read int32 data with count %d, error: %w", readSize, err)
return nil, fmt.Errorf("failed to read int32 data with count %d, error: %w", readSize, err)
}
// update read position after successfully read
@ -399,25 +400,25 @@ func (n *NumpyAdapter) ReadInt32(count int) ([]int32, error) {
func (n *NumpyAdapter) ReadInt64(count int) ([]int64, error) {
if count <= 0 {
return nil, errors.New("Numpy adapter: cannot read int64 data with a zero or nagative count")
return nil, errors.New("cannot read int64 data with a zero or nagative count")
}
// incorrect type
if n.dataType != schemapb.DataType_Int64 {
return nil, errors.New("Numpy adapter: numpy data is not int64 type")
return nil, errors.New("numpy data is not int64 type")
}
// avoid read overflow
readSize := n.checkCount(count)
if readSize <= 0 {
return nil, errors.New("Numpy adapter: end of int64 file, nothing to read")
return nil, errors.New("end of int64 file, nothing to read")
}
// read data
data := make([]int64, readSize)
err := binary.Read(n.reader, n.order, &data)
if err != nil {
return nil, fmt.Errorf("Numpy adapter: failed to read int64 data with count %d, error: %w", readSize, err)
return nil, fmt.Errorf("failed to read int64 data with count %d, error: %w", readSize, err)
}
// update read position after successfully read
@ -428,25 +429,25 @@ func (n *NumpyAdapter) ReadInt64(count int) ([]int64, error) {
func (n *NumpyAdapter) ReadFloat32(count int) ([]float32, error) {
if count <= 0 {
return nil, errors.New("Numpy adapter: cannot read float32 data with a zero or nagative count")
return nil, errors.New("cannot read float32 data with a zero or nagative count")
}
// incorrect type
if n.dataType != schemapb.DataType_Float {
return nil, errors.New("Numpy adapter: numpy data is not float32 type")
return nil, errors.New("numpy data is not float32 type")
}
// avoid read overflow
readSize := n.checkCount(count)
if readSize <= 0 {
return nil, errors.New("Numpy adapter: end of float32 file, nothing to read")
return nil, errors.New("end of float32 file, nothing to read")
}
// read data
data := make([]float32, readSize)
err := binary.Read(n.reader, n.order, &data)
if err != nil {
return nil, fmt.Errorf("Numpy adapter: failed to read float32 data with count %d, error: %w", readSize, err)
return nil, fmt.Errorf("failed to read float32 data with count %d, error: %w", readSize, err)
}
// update read position after successfully read
@ -457,25 +458,25 @@ func (n *NumpyAdapter) ReadFloat32(count int) ([]float32, error) {
func (n *NumpyAdapter) ReadFloat64(count int) ([]float64, error) {
if count <= 0 {
return nil, errors.New("Numpy adapter: cannot read float64 data with a zero or nagative count")
return nil, errors.New("cannot read float64 data with a zero or nagative count")
}
// incorrect type
if n.dataType != schemapb.DataType_Double {
return nil, errors.New("Numpy adapter: numpy data is not float64 type")
return nil, errors.New("numpy data is not float64 type")
}
// avoid read overflow
readSize := n.checkCount(count)
if readSize <= 0 {
return nil, errors.New("Numpy adapter: end of float64 file, nothing to read")
return nil, errors.New("end of float64 file, nothing to read")
}
// read data
data := make([]float64, readSize)
err := binary.Read(n.reader, n.order, &data)
if err != nil {
return nil, fmt.Errorf("Numpy adapter: failed to read float64 data with count %d, error: %w", readSize, err)
return nil, fmt.Errorf("failed to read float64 data with count %d, error: %w", readSize, err)
}
// update read position after successfully read
@ -486,26 +487,26 @@ func (n *NumpyAdapter) ReadFloat64(count int) ([]float64, error) {
func (n *NumpyAdapter) ReadString(count int) ([]string, error) {
if count <= 0 {
return nil, errors.New("Numpy adapter: cannot read varhar data with a zero or nagative count")
return nil, errors.New("cannot read varhar data with a zero or nagative count")
}
// incorrect type
if n.dataType != schemapb.DataType_VarChar {
return nil, errors.New("Numpy adapter: numpy data is not varhar type")
return nil, errors.New("numpy data is not varhar type")
}
// varchar length, this is the max length, some item is shorter than this length, but they also occupy bytes of max length
maxLen, utf, err := stringLen(n.npyReader.Header.Descr.Type)
if err != nil || maxLen <= 0 {
log.Error("Numpy adapter: failed to get max length of varchar from numpy file header", zap.Int("maxLen", maxLen), zap.Error(err))
return nil, fmt.Errorf("Numpy adapter: failed to get max length %d of varchar from numpy file header, error: %w", maxLen, err)
return nil, fmt.Errorf("failed to get max length %d of varchar from numpy file header, error: %w", maxLen, err)
}
log.Info("Numpy adapter: get varchar max length from numpy file header", zap.Int("maxLen", maxLen), zap.Bool("utf", utf))
// avoid read overflow
readSize := n.checkCount(count)
if readSize <= 0 {
return nil, errors.New("Numpy adapter: end of varhar file, nothing to read")
return nil, errors.New("end of varhar file, nothing to read")
}
// read data
@ -522,13 +523,13 @@ func (n *NumpyAdapter) ReadString(count int) ([]string, error) {
raw, err := ioutil.ReadAll(io.LimitReader(n.reader, utf8.UTFMax*int64(maxLen)))
if err != nil {
log.Error("Numpy adapter: failed to read utf32 bytes from numpy file", zap.Int("i", i), zap.Error(err))
return nil, fmt.Errorf("Numpy adapter: failed to read utf32 bytes from numpy file, error: %w", err)
return nil, fmt.Errorf("failed to read utf32 bytes from numpy file, error: %w", err)
}
str, err := decodeUtf32(raw, n.order)
if err != nil {
log.Error("Numpy adapter: failed todecode utf32 bytes", zap.Int("i", i), zap.Error(err))
return nil, fmt.Errorf("Numpy adapter: failed to decode utf32 bytes, error: %w", err)
return nil, fmt.Errorf("failed to decode utf32 bytes, error: %w", err)
}
data = append(data, str)
@ -538,7 +539,7 @@ func (n *NumpyAdapter) ReadString(count int) ([]string, error) {
buf, err := ioutil.ReadAll(io.LimitReader(n.reader, int64(maxLen)))
if err != nil {
log.Error("Numpy adapter: failed to read ascii bytes from numpy file", zap.Int("i", i), zap.Error(err))
return nil, fmt.Errorf("Numpy adapter: failed to read ascii bytes from numpy file, error: %w", err)
return nil, fmt.Errorf("failed to read ascii bytes from numpy file, error: %w", err)
}
n := bytes.Index(buf, []byte{0})
if n > 0 {

View File

@ -64,7 +64,7 @@ func NewNumpyParser(ctx context.Context, collectionSchema *schemapb.CollectionSc
func (p *NumpyParser) validate(adapter *NumpyAdapter, fieldName string) error {
if adapter == nil {
log.Error("Numpy parser: numpy adapter is nil")
return errors.New("Numpy parser: numpy adapter is nil")
return errors.New("numpy adapter is nil")
}
// check existence of the target field
@ -79,7 +79,7 @@ func (p *NumpyParser) validate(adapter *NumpyAdapter, fieldName string) error {
if p.columnDesc.name == "" {
log.Error("Numpy parser: Numpy parser: the field is not found in collection schema", zap.String("fieldName", fieldName))
return fmt.Errorf("Numpy parser: the field name '%s' is not found in collection schema", fieldName)
return fmt.Errorf("the field name '%s' is not found in collection schema", fieldName)
}
p.columnDesc.dt = schema.DataType
@ -96,14 +96,14 @@ func (p *NumpyParser) validate(adapter *NumpyAdapter, fieldName string) error {
if elementType != schemapb.DataType_Float && elementType != schemapb.DataType_Double {
log.Error("Numpy parser: illegal data type of numpy file for float vector field", zap.Any("dataType", elementType),
zap.String("fieldName", fieldName))
return fmt.Errorf("Numpy parser: illegal data type %s of numpy file for float vector field '%s'", getTypeName(elementType), schema.GetName())
return fmt.Errorf("illegal data type %s of numpy file for float vector field '%s'", getTypeName(elementType), schema.GetName())
}
// vector field, the shape should be 2
if len(shape) != 2 {
log.Error("Numpy parser: illegal shape of numpy file for float vector field, shape should be 2", zap.Int("shape", len(shape)),
zap.String("fieldName", fieldName))
return fmt.Errorf("Numpy parser: illegal shape %d of numpy file for float vector field '%s', shape should be 2", shape, schema.GetName())
return fmt.Errorf("illegal shape %d of numpy file for float vector field '%s', shape should be 2", shape, schema.GetName())
}
// shape[0] is row count, shape[1] is element count per row
@ -117,21 +117,21 @@ func (p *NumpyParser) validate(adapter *NumpyAdapter, fieldName string) error {
if shape[1] != p.columnDesc.dimension {
log.Error("Numpy parser: illegal dimension of numpy file for float vector field", zap.String("fieldName", fieldName),
zap.Int("numpyDimension", shape[1]), zap.Int("fieldDimension", p.columnDesc.dimension))
return fmt.Errorf("Numpy parser: illegal dimension %d of numpy file for float vector field '%s', dimension should be %d",
return fmt.Errorf("illegal dimension %d of numpy file for float vector field '%s', dimension should be %d",
shape[1], schema.GetName(), p.columnDesc.dimension)
}
} else if schemapb.DataType_BinaryVector == schema.DataType {
if elementType != schemapb.DataType_BinaryVector {
log.Error("Numpy parser: illegal data type of numpy file for binary vector field", zap.Any("dataType", elementType),
zap.String("fieldName", fieldName))
return fmt.Errorf("Numpy parser: illegal data type %s of numpy file for binary vector field '%s'", getTypeName(elementType), schema.GetName())
return fmt.Errorf("illegal data type %s of numpy file for binary vector field '%s'", getTypeName(elementType), schema.GetName())
}
// vector field, the shape should be 2
if len(shape) != 2 {
log.Error("Numpy parser: illegal shape of numpy file for binary vector field, shape should be 2", zap.Int("shape", len(shape)),
zap.String("fieldName", fieldName))
return fmt.Errorf("Numpy parser: illegal shape %d of numpy file for binary vector field '%s', shape should be 2", shape, schema.GetName())
return fmt.Errorf("illegal shape %d of numpy file for binary vector field '%s', shape should be 2", shape, schema.GetName())
}
// shape[0] is row count, shape[1] is element count per row
@ -145,14 +145,14 @@ func (p *NumpyParser) validate(adapter *NumpyAdapter, fieldName string) error {
if shape[1] != p.columnDesc.dimension/8 {
log.Error("Numpy parser: illegal dimension of numpy file for float vector field", zap.String("fieldName", fieldName),
zap.Int("numpyDimension", shape[1]*8), zap.Int("fieldDimension", p.columnDesc.dimension))
return fmt.Errorf("Numpy parser: illegal dimension %d of numpy file for binary vector field '%s', dimension should be %d",
return fmt.Errorf("illegal dimension %d of numpy file for binary vector field '%s', dimension should be %d",
shape[1]*8, schema.GetName(), p.columnDesc.dimension)
}
} else {
if elementType != schema.DataType {
log.Error("Numpy parser: illegal data type of numpy file for scalar field", zap.Any("numpyDataType", elementType),
zap.String("fieldName", fieldName), zap.Any("fieldDataType", schema.DataType))
return fmt.Errorf("Numpy parser: illegal data type %s of numpy file for scalar field '%s' with type %s",
return fmt.Errorf("illegal data type %s of numpy file for scalar field '%s' with type %s",
getTypeName(elementType), schema.GetName(), getTypeName(schema.DataType))
}
@ -160,7 +160,7 @@ func (p *NumpyParser) validate(adapter *NumpyAdapter, fieldName string) error {
if len(shape) != 1 {
log.Error("Numpy parser: illegal shape of numpy file for scalar field, shape should be 1", zap.Int("shape", len(shape)),
zap.String("fieldName", fieldName))
return fmt.Errorf("Numpy parser: illegal shape %d of numpy file for scalar field '%s', shape should be 1", shape, schema.GetName())
return fmt.Errorf("illegal shape %d of numpy file for scalar field '%s', shape should be 1", shape, schema.GetName())
}
p.columnDesc.elementCount = shape[0]
@ -176,7 +176,7 @@ func (p *NumpyParser) consume(adapter *NumpyAdapter) error {
case schemapb.DataType_Bool:
data, err := adapter.ReadBool(p.columnDesc.elementCount)
if err != nil {
log.Error(err.Error())
log.Error("Numpy parser: failed to read bool array", zap.Error(err))
return err
}
@ -188,7 +188,7 @@ func (p *NumpyParser) consume(adapter *NumpyAdapter) error {
case schemapb.DataType_Int8:
data, err := adapter.ReadInt8(p.columnDesc.elementCount)
if err != nil {
log.Error(err.Error())
log.Error("Numpy parser: failed to read int8 array", zap.Error(err))
return err
}
@ -199,7 +199,7 @@ func (p *NumpyParser) consume(adapter *NumpyAdapter) error {
case schemapb.DataType_Int16:
data, err := adapter.ReadInt16(p.columnDesc.elementCount)
if err != nil {
log.Error(err.Error())
log.Error("Numpy parser: failed to int16 bool array", zap.Error(err))
return err
}
@ -210,7 +210,7 @@ func (p *NumpyParser) consume(adapter *NumpyAdapter) error {
case schemapb.DataType_Int32:
data, err := adapter.ReadInt32(p.columnDesc.elementCount)
if err != nil {
log.Error(err.Error())
log.Error("Numpy parser: failed to read int32 array", zap.Error(err))
return err
}
@ -221,7 +221,7 @@ func (p *NumpyParser) consume(adapter *NumpyAdapter) error {
case schemapb.DataType_Int64:
data, err := adapter.ReadInt64(p.columnDesc.elementCount)
if err != nil {
log.Error(err.Error())
log.Error("Numpy parser: failed to read int64 array", zap.Error(err))
return err
}
@ -232,7 +232,7 @@ func (p *NumpyParser) consume(adapter *NumpyAdapter) error {
case schemapb.DataType_Float:
data, err := adapter.ReadFloat32(p.columnDesc.elementCount)
if err != nil {
log.Error(err.Error())
log.Error("Numpy parser: failed to read float array", zap.Error(err))
return err
}
@ -243,7 +243,7 @@ func (p *NumpyParser) consume(adapter *NumpyAdapter) error {
case schemapb.DataType_Double:
data, err := adapter.ReadFloat64(p.columnDesc.elementCount)
if err != nil {
log.Error(err.Error())
log.Error("Numpy parser: failed to read double array", zap.Error(err))
return err
}
@ -254,7 +254,7 @@ func (p *NumpyParser) consume(adapter *NumpyAdapter) error {
case schemapb.DataType_VarChar:
data, err := adapter.ReadString(p.columnDesc.elementCount)
if err != nil {
log.Error(err.Error())
log.Error("Numpy parser: failed to read varchar array", zap.Error(err))
return err
}
@ -265,7 +265,7 @@ func (p *NumpyParser) consume(adapter *NumpyAdapter) error {
case schemapb.DataType_BinaryVector:
data, err := adapter.ReadUint8(p.columnDesc.elementCount)
if err != nil {
log.Error(err.Error())
log.Error("Numpy parser: failed to read binary vector array", zap.Error(err))
return err
}
@ -285,14 +285,14 @@ func (p *NumpyParser) consume(adapter *NumpyAdapter) error {
if elementType == schemapb.DataType_Float {
data, err = adapter.ReadFloat32(p.columnDesc.elementCount)
if err != nil {
log.Error(err.Error())
log.Error("Numpy parser: failed to read float vector array", zap.Error(err))
return err
}
} else if elementType == schemapb.DataType_Double {
data = make([]float32, 0, p.columnDesc.elementCount)
data64, err := adapter.ReadFloat64(p.columnDesc.elementCount)
if err != nil {
log.Error(err.Error())
log.Error("Numpy parser: failed to read float vector array", zap.Error(err))
return err
}
@ -308,7 +308,7 @@ func (p *NumpyParser) consume(adapter *NumpyAdapter) error {
}
default:
log.Error("Numpy parser: unsupported data type of field", zap.Any("dataType", p.columnDesc.dt), zap.String("fieldName", p.columnDesc.name))
return fmt.Errorf("Numpy parser: unsupported data type %s of field '%s'", getTypeName(p.columnDesc.dt), p.columnDesc.name)
return fmt.Errorf("unsupported data type %s of field '%s'", getTypeName(p.columnDesc.dt), p.columnDesc.name)
}
return nil