2022-10-27 08:21:34 +00:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importutil
import (
"context"
2022-11-14 02:15:10 +00:00
"encoding/json"
2022-10-27 08:21:34 +00:00
"fmt"
"path"
"runtime/debug"
"strconv"
"strings"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
2023-06-08 17:28:37 +00:00
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
2023-11-29 12:52:27 +00:00
"github.com/milvus-io/milvus/internal/allocator"
2022-10-27 08:21:34 +00:00
"github.com/milvus-io/milvus/internal/storage"
2023-04-06 11:14:32 +00:00
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
2023-11-27 05:50:27 +00:00
"github.com/milvus-io/milvus/pkg/util/merr"
2023-04-06 11:14:32 +00:00
"github.com/milvus-io/milvus/pkg/util/typeutil"
2022-10-27 08:21:34 +00:00
)
2023-09-21 01:45:27 +00:00
type (
BlockData map [ storage . FieldID ] storage . FieldData // a map of field ID to field data
ShardData map [ int64 ] BlockData // a map of partition ID to block data
)
2023-07-11 07:18:28 +00:00
2022-10-27 08:21:34 +00:00
func isCanceled ( ctx context . Context ) bool {
// canceled?
select {
case <- ctx . Done ( ) :
return true
default :
break
}
return false
}
2023-07-11 07:18:28 +00:00
func initBlockData ( collectionSchema * schemapb . CollectionSchema ) BlockData {
blockData := make ( BlockData )
2022-10-27 08:21:34 +00:00
// rowID field is a hidden field with fieldID=0, it is always auto-generated by IDAllocator
// if primary key is int64 and autoID=true, primary key field is equal to rowID field
2023-07-11 07:18:28 +00:00
blockData [ common . RowIDField ] = & storage . Int64FieldData {
2023-01-28 03:09:52 +00:00
Data : make ( [ ] int64 , 0 ) ,
2022-10-27 08:21:34 +00:00
}
for i := 0 ; i < len ( collectionSchema . Fields ) ; i ++ {
schema := collectionSchema . Fields [ i ]
switch schema . DataType {
case schemapb . DataType_Bool :
2023-07-11 07:18:28 +00:00
blockData [ schema . GetFieldID ( ) ] = & storage . BoolFieldData {
2023-01-28 03:09:52 +00:00
Data : make ( [ ] bool , 0 ) ,
2022-10-27 08:21:34 +00:00
}
case schemapb . DataType_Float :
2023-07-11 07:18:28 +00:00
blockData [ schema . GetFieldID ( ) ] = & storage . FloatFieldData {
2023-01-28 03:09:52 +00:00
Data : make ( [ ] float32 , 0 ) ,
2022-10-27 08:21:34 +00:00
}
case schemapb . DataType_Double :
2023-07-11 07:18:28 +00:00
blockData [ schema . GetFieldID ( ) ] = & storage . DoubleFieldData {
2023-01-28 03:09:52 +00:00
Data : make ( [ ] float64 , 0 ) ,
2022-10-27 08:21:34 +00:00
}
case schemapb . DataType_Int8 :
2023-07-11 07:18:28 +00:00
blockData [ schema . GetFieldID ( ) ] = & storage . Int8FieldData {
2023-01-28 03:09:52 +00:00
Data : make ( [ ] int8 , 0 ) ,
2022-10-27 08:21:34 +00:00
}
case schemapb . DataType_Int16 :
2023-07-11 07:18:28 +00:00
blockData [ schema . GetFieldID ( ) ] = & storage . Int16FieldData {
2023-01-28 03:09:52 +00:00
Data : make ( [ ] int16 , 0 ) ,
2022-10-27 08:21:34 +00:00
}
case schemapb . DataType_Int32 :
2023-07-11 07:18:28 +00:00
blockData [ schema . GetFieldID ( ) ] = & storage . Int32FieldData {
2023-01-28 03:09:52 +00:00
Data : make ( [ ] int32 , 0 ) ,
2022-10-27 08:21:34 +00:00
}
case schemapb . DataType_Int64 :
2023-07-11 07:18:28 +00:00
blockData [ schema . GetFieldID ( ) ] = & storage . Int64FieldData {
2023-01-28 03:09:52 +00:00
Data : make ( [ ] int64 , 0 ) ,
2022-10-27 08:21:34 +00:00
}
case schemapb . DataType_BinaryVector :
dim , _ := getFieldDimension ( schema )
2023-07-11 07:18:28 +00:00
blockData [ schema . GetFieldID ( ) ] = & storage . BinaryVectorFieldData {
2023-01-28 03:09:52 +00:00
Data : make ( [ ] byte , 0 ) ,
Dim : dim ,
2022-10-27 08:21:34 +00:00
}
case schemapb . DataType_FloatVector :
dim , _ := getFieldDimension ( schema )
2023-07-11 07:18:28 +00:00
blockData [ schema . GetFieldID ( ) ] = & storage . FloatVectorFieldData {
2023-01-28 03:09:52 +00:00
Data : make ( [ ] float32 , 0 ) ,
Dim : dim ,
2022-10-27 08:21:34 +00:00
}
case schemapb . DataType_String , schemapb . DataType_VarChar :
2023-07-11 07:18:28 +00:00
blockData [ schema . GetFieldID ( ) ] = & storage . StringFieldData {
2023-01-28 03:09:52 +00:00
Data : make ( [ ] string , 0 ) ,
2022-10-27 08:21:34 +00:00
}
2023-05-18 08:33:23 +00:00
case schemapb . DataType_JSON :
2023-07-11 07:18:28 +00:00
blockData [ schema . GetFieldID ( ) ] = & storage . JSONFieldData {
2023-05-18 08:33:23 +00:00
Data : make ( [ ] [ ] byte , 0 ) ,
}
2023-11-27 05:50:27 +00:00
case schemapb . DataType_Array :
blockData [ schema . GetFieldID ( ) ] = & storage . ArrayFieldData {
Data : make ( [ ] * schemapb . ScalarField , 0 ) ,
ElementType : schema . GetElementType ( ) ,
}
2022-10-27 08:21:34 +00:00
default :
2023-07-11 07:18:28 +00:00
log . Warn ( "Import util: unsupported data type" , zap . String ( "DataType" , getTypeName ( schema . DataType ) ) )
2022-10-27 08:21:34 +00:00
return nil
}
}
2023-07-11 07:18:28 +00:00
return blockData
}
func initShardData ( collectionSchema * schemapb . CollectionSchema , partitionIDs [ ] int64 ) ShardData {
shardData := make ( ShardData )
for i := 0 ; i < len ( partitionIDs ) ; i ++ {
blockData := initBlockData ( collectionSchema )
if blockData == nil {
return nil
}
shardData [ partitionIDs [ i ] ] = blockData
}
return shardData
2022-10-27 08:21:34 +00:00
}
2022-11-16 11:05:08 +00:00
func parseFloat ( s string , bitsize int , fieldName string ) ( float64 , error ) {
value , err := strconv . ParseFloat ( s , bitsize )
if err != nil {
2023-11-27 05:50:27 +00:00
return 0 , merr . WrapErrImportFailed ( fmt . Sprintf ( "failed to parse value '%s' for field '%s', error: %v" , s , fieldName , err ) )
2022-11-16 11:05:08 +00:00
}
2023-03-14 06:03:58 +00:00
err = typeutil . VerifyFloat ( value )
if err != nil {
2023-11-27 05:50:27 +00:00
return 0 , merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%s' for field '%s', error: %v" , s , fieldName , err ) )
2022-11-16 11:05:08 +00:00
}
return value , nil
}
2023-07-11 07:18:28 +00:00
// Validator is field value validator
type Validator struct {
convertFunc func ( obj interface { } , field storage . FieldData ) error // convert data function
primaryKey bool // true for primary key
autoID bool // only for primary key field
isString bool // for string field
dimension int // only for vector field
fieldName string // field name
2023-09-21 01:45:27 +00:00
fieldID int64 // field ID
2023-07-11 07:18:28 +00:00
}
2022-10-27 08:21:34 +00:00
// initValidators constructs valiator methods and data conversion methods
func initValidators ( collectionSchema * schemapb . CollectionSchema , validators map [ storage . FieldID ] * Validator ) error {
if collectionSchema == nil {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( "collection schema is nil" )
2022-10-27 08:21:34 +00:00
}
for i := 0 ; i < len ( collectionSchema . Fields ) ; i ++ {
schema := collectionSchema . Fields [ i ]
validators [ schema . GetFieldID ( ) ] = & Validator { }
validators [ schema . GetFieldID ( ) ] . primaryKey = schema . GetIsPrimaryKey ( )
validators [ schema . GetFieldID ( ) ] . autoID = schema . GetAutoID ( )
validators [ schema . GetFieldID ( ) ] . fieldName = schema . GetName ( )
2023-07-11 07:18:28 +00:00
validators [ schema . GetFieldID ( ) ] . fieldID = schema . GetFieldID ( )
2022-10-27 08:21:34 +00:00
validators [ schema . GetFieldID ( ) ] . isString = false
switch schema . DataType {
case schemapb . DataType_Bool :
2022-11-16 11:05:08 +00:00
validators [ schema . GetFieldID ( ) ] . convertFunc = func ( obj interface { } , field storage . FieldData ) error {
if value , ok := obj . ( bool ) ; ok {
field . ( * storage . BoolFieldData ) . Data = append ( field . ( * storage . BoolFieldData ) . Data , value )
} else {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for bool type field '%s'" , obj , schema . GetName ( ) ) )
2022-10-27 08:21:34 +00:00
}
return nil
}
case schemapb . DataType_Float :
validators [ schema . GetFieldID ( ) ] . convertFunc = func ( obj interface { } , field storage . FieldData ) error {
2022-11-16 11:05:08 +00:00
if num , ok := obj . ( json . Number ) ; ok {
value , err := parseFloat ( string ( num ) , 32 , schema . GetName ( ) )
if err != nil {
return err
}
field . ( * storage . FloatFieldData ) . Data = append ( field . ( * storage . FloatFieldData ) . Data , float32 ( value ) )
} else {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for float type field '%s'" , obj , schema . GetName ( ) ) )
2022-11-14 02:15:10 +00:00
}
2022-11-16 11:05:08 +00:00
2022-10-27 08:21:34 +00:00
return nil
}
case schemapb . DataType_Double :
validators [ schema . GetFieldID ( ) ] . convertFunc = func ( obj interface { } , field storage . FieldData ) error {
2022-11-16 11:05:08 +00:00
if num , ok := obj . ( json . Number ) ; ok {
value , err := parseFloat ( string ( num ) , 64 , schema . GetName ( ) )
if err != nil {
return err
}
field . ( * storage . DoubleFieldData ) . Data = append ( field . ( * storage . DoubleFieldData ) . Data , value )
} else {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for double type field '%s'" , obj , schema . GetName ( ) ) )
2022-11-14 02:15:10 +00:00
}
2022-10-27 08:21:34 +00:00
return nil
}
case schemapb . DataType_Int8 :
validators [ schema . GetFieldID ( ) ] . convertFunc = func ( obj interface { } , field storage . FieldData ) error {
2022-11-16 11:05:08 +00:00
if num , ok := obj . ( json . Number ) ; ok {
value , err := strconv . ParseInt ( string ( num ) , 0 , 8 )
if err != nil {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "failed to parse value '%v' for int8 field '%s', error: %v" , num , schema . GetName ( ) , err ) )
2022-11-16 11:05:08 +00:00
}
field . ( * storage . Int8FieldData ) . Data = append ( field . ( * storage . Int8FieldData ) . Data , int8 ( value ) )
} else {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for int8 type field '%s'" , obj , schema . GetName ( ) ) )
2022-11-14 02:15:10 +00:00
}
2022-10-27 08:21:34 +00:00
return nil
}
case schemapb . DataType_Int16 :
validators [ schema . GetFieldID ( ) ] . convertFunc = func ( obj interface { } , field storage . FieldData ) error {
2022-11-16 11:05:08 +00:00
if num , ok := obj . ( json . Number ) ; ok {
value , err := strconv . ParseInt ( string ( num ) , 0 , 16 )
if err != nil {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "failed to parse value '%v' for int16 field '%s', error: %v" , num , schema . GetName ( ) , err ) )
2022-11-16 11:05:08 +00:00
}
field . ( * storage . Int16FieldData ) . Data = append ( field . ( * storage . Int16FieldData ) . Data , int16 ( value ) )
} else {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for int16 type field '%s'" , obj , schema . GetName ( ) ) )
2022-11-14 02:15:10 +00:00
}
2022-10-27 08:21:34 +00:00
return nil
}
case schemapb . DataType_Int32 :
validators [ schema . GetFieldID ( ) ] . convertFunc = func ( obj interface { } , field storage . FieldData ) error {
2022-11-16 11:05:08 +00:00
if num , ok := obj . ( json . Number ) ; ok {
value , err := strconv . ParseInt ( string ( num ) , 0 , 32 )
if err != nil {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "failed to parse value '%v' for int32 field '%s', error: %v" , num , schema . GetName ( ) , err ) )
2022-11-16 11:05:08 +00:00
}
field . ( * storage . Int32FieldData ) . Data = append ( field . ( * storage . Int32FieldData ) . Data , int32 ( value ) )
} else {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for int32 type field '%s'" , obj , schema . GetName ( ) ) )
2022-11-14 02:15:10 +00:00
}
2022-10-27 08:21:34 +00:00
return nil
}
case schemapb . DataType_Int64 :
validators [ schema . GetFieldID ( ) ] . convertFunc = func ( obj interface { } , field storage . FieldData ) error {
2022-11-16 11:05:08 +00:00
if num , ok := obj . ( json . Number ) ; ok {
value , err := strconv . ParseInt ( string ( num ) , 0 , 64 )
if err != nil {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "failed to parse value '%v' for int64 field '%s', error: %v" , num , schema . GetName ( ) , err ) )
2022-11-16 11:05:08 +00:00
}
field . ( * storage . Int64FieldData ) . Data = append ( field . ( * storage . Int64FieldData ) . Data , value )
} else {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for int64 type field '%s'" , obj , schema . GetName ( ) ) )
2022-11-14 02:15:10 +00:00
}
2022-10-27 08:21:34 +00:00
return nil
}
case schemapb . DataType_BinaryVector :
dim , err := getFieldDimension ( schema )
if err != nil {
return err
}
validators [ schema . GetFieldID ( ) ] . dimension = dim
2022-11-16 11:05:08 +00:00
validators [ schema . GetFieldID ( ) ] . convertFunc = func ( obj interface { } , field storage . FieldData ) error {
arr , ok := obj . ( [ ] interface { } )
if ! ok {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "'%v' is not an array for binary vector field '%s'" , obj , schema . GetName ( ) ) )
2022-10-27 08:21:34 +00:00
}
2022-11-16 11:05:08 +00:00
// we use uint8 to represent binary vector in json file, each uint8 value represents 8 dimensions.
if len ( arr ) * 8 != dim {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "bit size %d doesn't equal to vector dimension %d of field '%s'" , len ( arr ) * 8 , dim , schema . GetName ( ) ) )
2022-11-16 11:05:08 +00:00
}
2022-10-27 08:21:34 +00:00
for i := 0 ; i < len ( arr ) ; i ++ {
2022-11-16 11:05:08 +00:00
if num , ok := arr [ i ] . ( json . Number ) ; ok {
value , err := strconv . ParseUint ( string ( num ) , 0 , 8 )
if err != nil {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "failed to parse value '%v' for binary vector field '%s', error: %v" , num , schema . GetName ( ) , err ) )
2022-11-16 11:05:08 +00:00
}
field . ( * storage . BinaryVectorFieldData ) . Data = append ( field . ( * storage . BinaryVectorFieldData ) . Data , byte ( value ) )
} else {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for binary vector field '%s'" , obj , schema . GetName ( ) ) )
2022-11-14 02:15:10 +00:00
}
2022-10-27 08:21:34 +00:00
}
return nil
}
case schemapb . DataType_FloatVector :
dim , err := getFieldDimension ( schema )
if err != nil {
return err
}
validators [ schema . GetFieldID ( ) ] . dimension = dim
2022-11-16 11:05:08 +00:00
validators [ schema . GetFieldID ( ) ] . convertFunc = func ( obj interface { } , field storage . FieldData ) error {
arr , ok := obj . ( [ ] interface { } )
if ! ok {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "'%v' is not an array for float vector field '%s'" , obj , schema . GetName ( ) ) )
2022-10-27 08:21:34 +00:00
}
2022-11-16 11:05:08 +00:00
if len ( arr ) != dim {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "array size %d doesn't equal to vector dimension %d of field '%s'" , len ( arr ) , dim , schema . GetName ( ) ) )
2022-11-16 11:05:08 +00:00
}
2022-10-27 08:21:34 +00:00
for i := 0 ; i < len ( arr ) ; i ++ {
2022-11-16 11:05:08 +00:00
if num , ok := arr [ i ] . ( json . Number ) ; ok {
value , err := parseFloat ( string ( num ) , 32 , schema . GetName ( ) )
if err != nil {
return err
}
field . ( * storage . FloatVectorFieldData ) . Data = append ( field . ( * storage . FloatVectorFieldData ) . Data , float32 ( value ) )
} else {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for float vector field '%s'" , obj , schema . GetName ( ) ) )
2022-11-14 02:15:10 +00:00
}
2022-10-27 08:21:34 +00:00
}
2022-11-16 11:05:08 +00:00
2022-10-27 08:21:34 +00:00
return nil
}
case schemapb . DataType_String , schemapb . DataType_VarChar :
validators [ schema . GetFieldID ( ) ] . isString = true
validators [ schema . GetFieldID ( ) ] . convertFunc = func ( obj interface { } , field storage . FieldData ) error {
2022-11-16 11:05:08 +00:00
if value , ok := obj . ( string ) ; ok {
field . ( * storage . StringFieldData ) . Data = append ( field . ( * storage . StringFieldData ) . Data , value )
} else {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for varchar type field '%s'" , obj , schema . GetName ( ) ) )
2022-11-16 11:05:08 +00:00
}
2022-10-27 08:21:34 +00:00
return nil
}
2023-05-18 08:33:23 +00:00
case schemapb . DataType_JSON :
validators [ schema . GetFieldID ( ) ] . convertFunc = func ( obj interface { } , field storage . FieldData ) error {
2023-05-23 06:17:25 +00:00
// for JSON data, we accept two kinds input: string and map[string]interface
// user can write JSON content as {"FieldJSON": "{\"x\": 8}"} or {"FieldJSON": {"x": 8}}
2023-05-18 08:33:23 +00:00
if value , ok := obj . ( string ) ; ok {
var dummy interface { }
err := json . Unmarshal ( [ ] byte ( value ) , & dummy )
if err != nil {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "failed to parse value '%v' for JSON field '%s', error: %v" , value , schema . GetName ( ) , err ) )
2023-05-18 08:33:23 +00:00
}
field . ( * storage . JSONFieldData ) . Data = append ( field . ( * storage . JSONFieldData ) . Data , [ ] byte ( value ) )
2023-05-23 06:17:25 +00:00
} else if mp , ok := obj . ( map [ string ] interface { } ) ; ok {
bs , err := json . Marshal ( mp )
if err != nil {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "failed to parse value for JSON field '%s', error: %v" , schema . GetName ( ) , err ) )
2023-05-23 06:17:25 +00:00
}
field . ( * storage . JSONFieldData ) . Data = append ( field . ( * storage . JSONFieldData ) . Data , bs )
2023-05-18 08:33:23 +00:00
} else {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for JSON type field '%s'" , obj , schema . GetName ( ) ) )
2023-05-18 08:33:23 +00:00
}
return nil
}
2023-11-27 05:50:27 +00:00
case schemapb . DataType_Array :
validators [ schema . GetFieldID ( ) ] . convertFunc = func ( obj interface { } , field storage . FieldData ) error {
arr , ok := obj . ( [ ] interface { } )
if ! ok {
return merr . WrapErrImportFailed ( fmt . Sprintf ( "'%v' is not an array for array field '%s'" , obj , schema . GetName ( ) ) )
}
return getArrayElementData ( schema , arr , field )
}
2022-10-27 08:21:34 +00:00
default :
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "unsupport data type: %s" , getTypeName ( collectionSchema . Fields [ i ] . DataType ) ) )
2022-10-27 08:21:34 +00:00
}
}
return nil
}
2023-11-27 05:50:27 +00:00
func getArrayElementData ( schema * schemapb . FieldSchema , arr [ ] interface { } , field storage . FieldData ) error {
switch schema . GetElementType ( ) {
case schemapb . DataType_Bool :
boolData := make ( [ ] bool , 0 )
for i := 0 ; i < len ( arr ) ; i ++ {
if value , ok := arr [ i ] . ( bool ) ; ok {
boolData = append ( boolData , value )
} else {
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for bool array field '%s'" , arr , schema . GetName ( ) ) )
}
}
field . ( * storage . ArrayFieldData ) . Data = append ( field . ( * storage . ArrayFieldData ) . Data , & schemapb . ScalarField {
Data : & schemapb . ScalarField_BoolData {
BoolData : & schemapb . BoolArray {
Data : boolData ,
} ,
} ,
} )
case schemapb . DataType_Int8 :
int8Data := make ( [ ] int32 , 0 )
for i := 0 ; i < len ( arr ) ; i ++ {
if num , ok := arr [ i ] . ( json . Number ) ; ok {
value , err := strconv . ParseInt ( string ( num ) , 0 , 8 )
if err != nil {
return err
}
int8Data = append ( int8Data , int32 ( value ) )
} else {
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for int array field '%s'" , arr , schema . GetName ( ) ) )
}
}
field . ( * storage . ArrayFieldData ) . Data = append ( field . ( * storage . ArrayFieldData ) . Data , & schemapb . ScalarField {
Data : & schemapb . ScalarField_IntData {
IntData : & schemapb . IntArray {
Data : int8Data ,
} ,
} ,
} )
case schemapb . DataType_Int16 :
int16Data := make ( [ ] int32 , 0 )
for i := 0 ; i < len ( arr ) ; i ++ {
if num , ok := arr [ i ] . ( json . Number ) ; ok {
value , err := strconv . ParseInt ( string ( num ) , 0 , 16 )
if err != nil {
return err
}
int16Data = append ( int16Data , int32 ( value ) )
} else {
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for int array field '%s'" , arr , schema . GetName ( ) ) )
}
}
field . ( * storage . ArrayFieldData ) . Data = append ( field . ( * storage . ArrayFieldData ) . Data , & schemapb . ScalarField {
Data : & schemapb . ScalarField_IntData {
IntData : & schemapb . IntArray {
Data : int16Data ,
} ,
} ,
} )
case schemapb . DataType_Int32 :
intData := make ( [ ] int32 , 0 )
for i := 0 ; i < len ( arr ) ; i ++ {
if num , ok := arr [ i ] . ( json . Number ) ; ok {
value , err := strconv . ParseInt ( string ( num ) , 0 , 32 )
if err != nil {
return err
}
intData = append ( intData , int32 ( value ) )
} else {
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for int array field '%s'" , arr , schema . GetName ( ) ) )
}
}
field . ( * storage . ArrayFieldData ) . Data = append ( field . ( * storage . ArrayFieldData ) . Data , & schemapb . ScalarField {
Data : & schemapb . ScalarField_IntData {
IntData : & schemapb . IntArray {
Data : intData ,
} ,
} ,
} )
case schemapb . DataType_Int64 :
longData := make ( [ ] int64 , 0 )
for i := 0 ; i < len ( arr ) ; i ++ {
if num , ok := arr [ i ] . ( json . Number ) ; ok {
value , err := strconv . ParseInt ( string ( num ) , 0 , 64 )
if err != nil {
return err
}
longData = append ( longData , value )
} else {
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for long array field '%s'" , arr , schema . GetName ( ) ) )
}
}
field . ( * storage . ArrayFieldData ) . Data = append ( field . ( * storage . ArrayFieldData ) . Data , & schemapb . ScalarField {
Data : & schemapb . ScalarField_LongData {
LongData : & schemapb . LongArray {
Data : longData ,
} ,
} ,
} )
case schemapb . DataType_Float :
floatData := make ( [ ] float32 , 0 )
for i := 0 ; i < len ( arr ) ; i ++ {
if num , ok := arr [ i ] . ( json . Number ) ; ok {
value , err := parseFloat ( string ( num ) , 32 , schema . GetName ( ) )
if err != nil {
return err
}
floatData = append ( floatData , float32 ( value ) )
} else {
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for float array field '%s'" , arr , schema . GetName ( ) ) )
}
}
field . ( * storage . ArrayFieldData ) . Data = append ( field . ( * storage . ArrayFieldData ) . Data , & schemapb . ScalarField {
Data : & schemapb . ScalarField_FloatData {
FloatData : & schemapb . FloatArray {
Data : floatData ,
} ,
} ,
} )
case schemapb . DataType_Double :
doubleData := make ( [ ] float64 , 0 )
for i := 0 ; i < len ( arr ) ; i ++ {
if num , ok := arr [ i ] . ( json . Number ) ; ok {
value , err := parseFloat ( string ( num ) , 32 , schema . GetName ( ) )
if err != nil {
return err
}
doubleData = append ( doubleData , value )
} else {
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for double array field '%s'" , arr , schema . GetName ( ) ) )
}
}
field . ( * storage . ArrayFieldData ) . Data = append ( field . ( * storage . ArrayFieldData ) . Data , & schemapb . ScalarField {
Data : & schemapb . ScalarField_DoubleData {
DoubleData : & schemapb . DoubleArray {
Data : doubleData ,
} ,
} ,
} )
case schemapb . DataType_String , schemapb . DataType_VarChar :
stringFieldData := & schemapb . ScalarField {
Data : & schemapb . ScalarField_StringData {
StringData : & schemapb . StringArray {
Data : make ( [ ] string , 0 ) ,
} ,
} ,
}
for i := 0 ; i < len ( arr ) ; i ++ {
if str , ok := arr [ i ] . ( string ) ; ok {
stringFieldData . GetStringData ( ) . Data = append ( stringFieldData . GetStringData ( ) . Data , str )
} else {
return merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal value '%v' for string array field '%s'" , arr , schema . GetName ( ) ) )
}
}
field . ( * storage . ArrayFieldData ) . Data = append ( field . ( * storage . ArrayFieldData ) . Data , stringFieldData )
default :
return merr . WrapErrImportFailed ( fmt . Sprintf ( "unsupport element type: %v" , getTypeName ( schema . GetElementType ( ) ) ) )
}
return nil
}
2023-07-11 07:18:28 +00:00
func printFieldsDataInfo ( fieldsData BlockData , msg string , files [ ] string ) {
2022-10-27 08:21:34 +00:00
stats := make ( [ ] zapcore . Field , 0 )
for k , v := range fieldsData {
stats = append ( stats , zap . Int ( strconv . FormatInt ( k , 10 ) , v . RowNum ( ) ) )
}
if len ( files ) > 0 {
2022-12-07 06:53:18 +00:00
stats = append ( stats , zap . Any ( Files , files ) )
2022-10-27 08:21:34 +00:00
}
log . Info ( msg , stats ... )
}
// GetFileNameAndExt extracts file name and extension
// for example: "/a/b/c.ttt" returns "c" and ".ttt"
func GetFileNameAndExt ( filePath string ) ( string , string ) {
fileName := path . Base ( filePath )
fileType := path . Ext ( fileName )
fileNameWithoutExt := strings . TrimSuffix ( fileName , fileType )
return fileNameWithoutExt , fileType
}
// getFieldDimension gets dimension of vecotor field
func getFieldDimension ( schema * schemapb . FieldSchema ) ( int , error ) {
for _ , kvPair := range schema . GetTypeParams ( ) {
key , value := kvPair . GetKey ( ) , kvPair . GetValue ( )
2023-05-16 09:41:22 +00:00
if key == common . DimKey {
2022-10-27 08:21:34 +00:00
dim , err := strconv . Atoi ( value )
if err != nil {
2023-11-27 05:50:27 +00:00
return 0 , merr . WrapErrImportFailed ( fmt . Sprintf ( "illegal vector dimension '%s' for field '%s', error: %v" , value , schema . GetName ( ) , err ) )
2022-10-27 08:21:34 +00:00
}
return dim , nil
}
}
2023-11-27 05:50:27 +00:00
return 0 , merr . WrapErrImportFailed ( fmt . Sprintf ( "vector dimension is not defined for field '%s'" , schema . GetName ( ) ) )
2022-10-27 08:21:34 +00:00
}
// triggerGC triggers golang gc to return all free memory back to the underlying system at once,
// Note: this operation is expensive, and can lead to latency spikes as it holds the heap lock through the whole process
func triggerGC ( ) {
debug . FreeOSMemory ( )
}
2023-05-23 06:17:25 +00:00
// if user didn't provide dynamic data, fill the dynamic field by "{}"
2023-07-11 07:18:28 +00:00
func fillDynamicData ( blockData BlockData , collectionSchema * schemapb . CollectionSchema ) error {
2023-05-23 06:17:25 +00:00
if ! collectionSchema . GetEnableDynamicField ( ) {
return nil
}
dynamicFieldID := int64 ( - 1 )
for i := 0 ; i < len ( collectionSchema . Fields ) ; i ++ {
schema := collectionSchema . Fields [ i ]
if schema . GetIsDynamic ( ) {
dynamicFieldID = schema . GetFieldID ( )
break
}
}
if dynamicFieldID < 0 {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( "the collection schema is dynamic but dynamic field is not found" )
2023-05-23 06:17:25 +00:00
}
rowCount := 0
if len ( blockData ) > 0 {
2023-06-01 06:14:31 +00:00
for id , v := range blockData {
if id == dynamicFieldID {
continue
}
2023-05-23 06:17:25 +00:00
rowCount = v . RowNum ( )
}
}
2023-06-01 06:14:31 +00:00
dynamicData , ok := blockData [ dynamicFieldID ]
if ! ok || dynamicData == nil {
// dynamic field data is not provided, create new one
dynamicData = & storage . JSONFieldData {
2023-05-23 06:17:25 +00:00
Data : make ( [ ] [ ] byte , 0 ) ,
}
2023-06-01 06:14:31 +00:00
}
2023-05-23 06:17:25 +00:00
2023-07-28 07:25:03 +00:00
if dynamicData . RowNum ( ) < rowCount {
// fill the dynamic data by an empty JSON object, make sure the row count is eaual to other fields
2023-06-01 06:14:31 +00:00
data := dynamicData . ( * storage . JSONFieldData )
2023-05-23 06:17:25 +00:00
bs := [ ] byte ( "{}" )
2023-07-28 07:25:03 +00:00
dynamicRowCount := dynamicData . RowNum ( )
for i := 0 ; i < rowCount - dynamicRowCount ; i ++ {
2023-05-23 06:17:25 +00:00
data . Data = append ( data . Data , bs )
}
}
2023-06-01 06:14:31 +00:00
blockData [ dynamicFieldID ] = dynamicData
2023-05-23 06:17:25 +00:00
return nil
}
2022-10-27 08:21:34 +00:00
// tryFlushBlocks does the two things:
// 1. if accumulate data of a block exceed blockSize, call callFlushFunc to generate new binlog file
2023-10-31 14:47:23 +00:00
// 2. if total accumulate data exceed maxTotalSize, call callFlushFunc to flush the biggest block
2022-10-27 08:21:34 +00:00
func tryFlushBlocks ( ctx context . Context ,
2023-07-11 07:18:28 +00:00
shardsData [ ] ShardData ,
2022-10-27 08:21:34 +00:00
collectionSchema * schemapb . CollectionSchema ,
callFlushFunc ImportFlushFunc ,
blockSize int64 ,
maxTotalSize int64 ,
2023-09-21 01:45:27 +00:00
force bool ,
) error {
2022-10-27 08:21:34 +00:00
totalSize := 0
biggestSize := 0
biggestItem := - 1
2023-07-11 07:18:28 +00:00
biggestPartition := int64 ( - 1 )
2022-10-27 08:21:34 +00:00
// 1. if accumulate data of a block exceed blockSize, call callFlushFunc to generate new binlog file
2023-07-11 07:18:28 +00:00
for i := 0 ; i < len ( shardsData ) ; i ++ {
2022-10-27 08:21:34 +00:00
// outside context might be canceled(service stop, or future enhancement for canceling import task)
if isCanceled ( ctx ) {
2023-07-11 07:18:28 +00:00
log . Warn ( "Import util: import task was canceled" )
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( "import task was canceled" )
2022-10-27 08:21:34 +00:00
}
2023-07-11 07:18:28 +00:00
shardData := shardsData [ i ]
for partitionID , blockData := range shardData {
err := fillDynamicData ( blockData , collectionSchema )
2022-10-27 08:21:34 +00:00
if err != nil {
2023-07-11 07:18:28 +00:00
log . Warn ( "Import util: failed to fill dynamic field" , zap . Error ( err ) )
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "failed to fill dynamic field, error: %v" , err ) )
2023-07-11 07:18:28 +00:00
}
// Note: even rowCount is 0, the size is still non-zero
size := 0
rowCount := 0
for _ , fieldData := range blockData {
size += fieldData . GetMemorySize ( )
rowCount = fieldData . RowNum ( )
}
// force to flush, called at the end of Read()
if force && rowCount > 0 {
printFieldsDataInfo ( blockData , "import util: prepare to force flush a block" , nil )
err := callFlushFunc ( blockData , i , partitionID )
if err != nil {
log . Warn ( "Import util: failed to force flush block data" , zap . Int ( "shardID" , i ) ,
zap . Int64 ( "partitionID" , partitionID ) , zap . Error ( err ) )
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "failed to force flush block data for shard id %d to partition %d, error: %v" , i , partitionID , err ) )
2023-07-11 07:18:28 +00:00
}
log . Info ( "Import util: force flush" , zap . Int ( "rowCount" , rowCount ) , zap . Int ( "size" , size ) ,
zap . Int ( "shardID" , i ) , zap . Int64 ( "partitionID" , partitionID ) )
2022-10-27 08:21:34 +00:00
2023-07-11 07:18:28 +00:00
shardData [ partitionID ] = initBlockData ( collectionSchema )
if shardData [ partitionID ] == nil {
log . Warn ( "Import util: failed to initialize FieldData list" , zap . Int ( "shardID" , i ) , zap . Int64 ( "partitionID" , partitionID ) )
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "failed to initialize FieldData list for shard id %d to partition %d" , i , partitionID ) )
2023-07-11 07:18:28 +00:00
}
continue
2022-10-27 08:21:34 +00:00
}
2023-07-11 07:18:28 +00:00
// if segment size is larger than predefined blockSize, flush to create a new binlog file
// initialize a new FieldData list for next round batch read
if size > int ( blockSize ) && rowCount > 0 {
printFieldsDataInfo ( blockData , "import util: prepare to flush block larger than blockSize" , nil )
err := callFlushFunc ( blockData , i , partitionID )
if err != nil {
log . Warn ( "Import util: failed to flush block data" , zap . Int ( "shardID" , i ) ,
zap . Int64 ( "partitionID" , partitionID ) , zap . Error ( err ) )
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "failed to flush block data for shard id %d to partition %d, error: %v" , i , partitionID , err ) )
2023-07-11 07:18:28 +00:00
}
log . Info ( "Import util: block size exceed limit and flush" , zap . Int ( "rowCount" , rowCount ) , zap . Int ( "size" , size ) ,
zap . Int ( "shardID" , i ) , zap . Int64 ( "partitionID" , partitionID ) , zap . Int64 ( "blockSize" , blockSize ) )
2022-10-27 08:21:34 +00:00
2023-07-11 07:18:28 +00:00
shardData [ partitionID ] = initBlockData ( collectionSchema )
if shardData [ partitionID ] == nil {
log . Warn ( "Import util: failed to initialize FieldData list" , zap . Int ( "shardID" , i ) , zap . Int64 ( "partitionID" , partitionID ) )
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "failed to initialize FieldData list for shard id %d to partition %d" , i , partitionID ) )
2023-07-11 07:18:28 +00:00
}
continue
2022-10-27 08:21:34 +00:00
}
2023-07-11 07:18:28 +00:00
// calculate the total size(ignore the flushed blocks)
// find out the biggest block for the step 2
totalSize += size
if size > biggestSize {
biggestSize = size
biggestItem = i
biggestPartition = partitionID
}
2022-10-27 08:21:34 +00:00
}
}
// 2. if total accumulate data exceed maxTotalSize, call callFlushFUnc to flush the biggest block
2023-07-11 07:18:28 +00:00
if totalSize > int ( maxTotalSize ) && biggestItem >= 0 && biggestPartition >= 0 {
2022-10-27 08:21:34 +00:00
// outside context might be canceled(service stop, or future enhancement for canceling import task)
if isCanceled ( ctx ) {
2023-07-11 07:18:28 +00:00
log . Warn ( "Import util: import task was canceled" )
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( "import task was canceled" )
2022-10-27 08:21:34 +00:00
}
2023-07-11 07:18:28 +00:00
blockData := shardsData [ biggestItem ] [ biggestPartition ]
2023-06-01 06:14:31 +00:00
err := fillDynamicData ( blockData , collectionSchema )
if err != nil {
2023-07-11 07:18:28 +00:00
log . Warn ( "Import util: failed to fill dynamic field" , zap . Error ( err ) )
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "failed to fill dynamic field, error: %v" , err ) )
2023-06-01 06:14:31 +00:00
}
2022-10-27 08:21:34 +00:00
// Note: even rowCount is 0, the size is still non-zero
size := 0
rowCount := 0
for _ , fieldData := range blockData {
size += fieldData . GetMemorySize ( )
rowCount = fieldData . RowNum ( )
}
if rowCount > 0 {
printFieldsDataInfo ( blockData , "import util: prepare to flush biggest block" , nil )
2023-07-11 07:18:28 +00:00
err = callFlushFunc ( blockData , biggestItem , biggestPartition )
2022-10-27 08:21:34 +00:00
if err != nil {
2023-07-11 07:18:28 +00:00
log . Warn ( "Import util: failed to flush biggest block data" , zap . Int ( "shardID" , biggestItem ) ,
zap . Int64 ( "partitionID" , biggestPartition ) )
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "failed to flush biggest block data for shard id %d to partition %d, error: %v" ,
biggestItem , biggestPartition , err ) )
2022-10-27 08:21:34 +00:00
}
log . Info ( "Import util: total size exceed limit and flush" , zap . Int ( "rowCount" , rowCount ) ,
zap . Int ( "size" , size ) , zap . Int ( "totalSize" , totalSize ) , zap . Int ( "shardID" , biggestItem ) )
2023-07-11 07:18:28 +00:00
shardsData [ biggestItem ] [ biggestPartition ] = initBlockData ( collectionSchema )
if shardsData [ biggestItem ] [ biggestPartition ] == nil {
log . Warn ( "Import util: failed to initialize FieldData list" , zap . Int ( "shardID" , biggestItem ) ,
zap . Int64 ( "partitionID" , biggestPartition ) )
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( fmt . Sprintf ( "failed to initialize FieldData list for shard id %d to partition %d" , biggestItem , biggestPartition ) )
2022-10-27 08:21:34 +00:00
}
}
}
return nil
}
func getTypeName ( dt schemapb . DataType ) string {
switch dt {
case schemapb . DataType_Bool :
return "Bool"
case schemapb . DataType_Int8 :
return "Int8"
case schemapb . DataType_Int16 :
return "Int16"
case schemapb . DataType_Int32 :
return "Int32"
case schemapb . DataType_Int64 :
return "Int64"
case schemapb . DataType_Float :
return "Float"
case schemapb . DataType_Double :
return "Double"
case schemapb . DataType_VarChar :
return "Varchar"
case schemapb . DataType_String :
return "String"
case schemapb . DataType_BinaryVector :
return "BinaryVector"
case schemapb . DataType_FloatVector :
return "FloatVector"
2023-05-18 08:33:23 +00:00
case schemapb . DataType_JSON :
return "JSON"
2022-10-27 08:21:34 +00:00
default :
return "InvalidType"
}
}
2023-01-10 06:53:38 +00:00
func pkToShard ( pk interface { } , shardNum uint32 ) ( uint32 , error ) {
var shard uint32
2023-02-15 09:22:34 +00:00
strPK , ok := pk . ( string )
2023-01-10 06:53:38 +00:00
if ok {
hash := typeutil . HashString2Uint32 ( strPK )
shard = hash % shardNum
} else {
2023-02-15 09:22:34 +00:00
intPK , ok := pk . ( int64 )
2023-01-10 06:53:38 +00:00
if ! ok {
2023-11-29 12:52:27 +00:00
log . Warn ( "parser: primary key field must be int64 or varchar" )
2023-11-27 05:50:27 +00:00
return 0 , merr . WrapErrImportFailed ( "primary key field must be int64 or varchar" )
2023-01-10 06:53:38 +00:00
}
hash , _ := typeutil . Hash32Int64 ( intPK )
shard = hash % shardNum
}
return shard , nil
}
2023-01-11 09:37:44 +00:00
func UpdateKVInfo ( infos * [ ] * commonpb . KeyValuePair , k string , v string ) error {
if infos == nil {
2023-11-27 05:50:27 +00:00
return merr . WrapErrImportFailed ( "Import util: kv array pointer is nil" )
2023-01-11 09:37:44 +00:00
}
found := false
for _ , kv := range * infos {
if kv . GetKey ( ) == k {
kv . Value = v
found = true
}
}
if ! found {
* infos = append ( * infos , & commonpb . KeyValuePair { Key : k , Value : v } )
}
return nil
}
2023-11-29 12:52:27 +00:00
// appendFunc defines the methods to append data to storage.FieldData
func appendFunc ( schema * schemapb . FieldSchema ) func ( src storage . FieldData , n int , target storage . FieldData ) error {
switch schema . DataType {
case schemapb . DataType_Bool :
return func ( src storage . FieldData , n int , target storage . FieldData ) error {
arr := target . ( * storage . BoolFieldData )
arr . Data = append ( arr . Data , src . GetRow ( n ) . ( bool ) )
return nil
}
case schemapb . DataType_Float :
return func ( src storage . FieldData , n int , target storage . FieldData ) error {
arr := target . ( * storage . FloatFieldData )
arr . Data = append ( arr . Data , src . GetRow ( n ) . ( float32 ) )
return nil
}
case schemapb . DataType_Double :
return func ( src storage . FieldData , n int , target storage . FieldData ) error {
arr := target . ( * storage . DoubleFieldData )
arr . Data = append ( arr . Data , src . GetRow ( n ) . ( float64 ) )
return nil
}
case schemapb . DataType_Int8 :
return func ( src storage . FieldData , n int , target storage . FieldData ) error {
arr := target . ( * storage . Int8FieldData )
arr . Data = append ( arr . Data , src . GetRow ( n ) . ( int8 ) )
return nil
}
case schemapb . DataType_Int16 :
return func ( src storage . FieldData , n int , target storage . FieldData ) error {
arr := target . ( * storage . Int16FieldData )
arr . Data = append ( arr . Data , src . GetRow ( n ) . ( int16 ) )
return nil
}
case schemapb . DataType_Int32 :
return func ( src storage . FieldData , n int , target storage . FieldData ) error {
arr := target . ( * storage . Int32FieldData )
arr . Data = append ( arr . Data , src . GetRow ( n ) . ( int32 ) )
return nil
}
case schemapb . DataType_Int64 :
return func ( src storage . FieldData , n int , target storage . FieldData ) error {
arr := target . ( * storage . Int64FieldData )
arr . Data = append ( arr . Data , src . GetRow ( n ) . ( int64 ) )
return nil
}
case schemapb . DataType_BinaryVector :
return func ( src storage . FieldData , n int , target storage . FieldData ) error {
arr := target . ( * storage . BinaryVectorFieldData )
arr . Data = append ( arr . Data , src . GetRow ( n ) . ( [ ] byte ) ... )
return nil
}
case schemapb . DataType_FloatVector :
return func ( src storage . FieldData , n int , target storage . FieldData ) error {
arr := target . ( * storage . FloatVectorFieldData )
arr . Data = append ( arr . Data , src . GetRow ( n ) . ( [ ] float32 ) ... )
return nil
}
case schemapb . DataType_String , schemapb . DataType_VarChar :
return func ( src storage . FieldData , n int , target storage . FieldData ) error {
arr := target . ( * storage . StringFieldData )
arr . Data = append ( arr . Data , src . GetRow ( n ) . ( string ) )
return nil
}
case schemapb . DataType_JSON :
return func ( src storage . FieldData , n int , target storage . FieldData ) error {
arr := target . ( * storage . JSONFieldData )
arr . Data = append ( arr . Data , src . GetRow ( n ) . ( [ ] byte ) )
return nil
}
case schemapb . DataType_Array :
return func ( src storage . FieldData , n int , target storage . FieldData ) error {
arr := target . ( * storage . ArrayFieldData )
arr . Data = append ( arr . Data , src . GetRow ( n ) . ( * schemapb . ScalarField ) )
return nil
}
default :
return nil
}
}
func prepareAppendFunctions ( collectionInfo * CollectionInfo ) ( map [ string ] func ( src storage . FieldData , n int , target storage . FieldData ) error , error ) {
appendFunctions := make ( map [ string ] func ( src storage . FieldData , n int , target storage . FieldData ) error )
for i := 0 ; i < len ( collectionInfo . Schema . Fields ) ; i ++ {
schema := collectionInfo . Schema . Fields [ i ]
appendFuncErr := appendFunc ( schema )
if appendFuncErr == nil {
log . Warn ( "parser: unsupported field data type" )
return nil , fmt . Errorf ( "unsupported field data type: %d" , schema . GetDataType ( ) )
}
appendFunctions [ schema . GetName ( ) ] = appendFuncErr
}
return appendFunctions , nil
}
// checkRowCount check row count of each field, all fields row count must be equal
func checkRowCount ( collectionInfo * CollectionInfo , fieldsData BlockData ) ( int , error ) {
rowCount := 0
rowCounter := make ( map [ string ] int )
for i := 0 ; i < len ( collectionInfo . Schema . Fields ) ; i ++ {
schema := collectionInfo . Schema . Fields [ i ]
if ! schema . GetAutoID ( ) {
v , ok := fieldsData [ schema . GetFieldID ( ) ]
if ! ok {
if schema . GetIsDynamic ( ) {
// user might not provide numpy file for dynamic field, skip it, will auto-generate later
continue
}
log . Warn ( "field not provided" , zap . String ( "fieldName" , schema . GetName ( ) ) )
return 0 , fmt . Errorf ( "field '%s' not provided" , schema . GetName ( ) )
}
rowCounter [ schema . GetName ( ) ] = v . RowNum ( )
if v . RowNum ( ) > rowCount {
rowCount = v . RowNum ( )
}
}
}
for name , count := range rowCounter {
if count != rowCount {
log . Warn ( "field row count is not equal to other fields row count" , zap . String ( "fieldName" , name ) ,
zap . Int ( "rowCount" , count ) , zap . Int ( "otherRowCount" , rowCount ) )
return 0 , fmt . Errorf ( "field '%s' row count %d is not equal to other fields row count: %d" , name , count , rowCount )
}
}
return rowCount , nil
}
// hashToPartition hash partition key to get an partition ID, return the first partition ID if no partition key exist
// CollectionInfo ensures only one partition ID in the PartitionIDs if no partition key exist
func hashToPartition ( collectionInfo * CollectionInfo , fieldsData BlockData , rowNumber int ) ( int64 , error ) {
if collectionInfo . PartitionKey == nil {
// no partition key, directly return the target partition id
if len ( collectionInfo . PartitionIDs ) != 1 {
return 0 , fmt . Errorf ( "collection '%s' partition list is empty" , collectionInfo . Schema . Name )
}
return collectionInfo . PartitionIDs [ 0 ] , nil
}
partitionKeyID := collectionInfo . PartitionKey . GetFieldID ( )
fieldData := fieldsData [ partitionKeyID ]
value := fieldData . GetRow ( rowNumber )
index , err := pkToShard ( value , uint32 ( len ( collectionInfo . PartitionIDs ) ) )
if err != nil {
return 0 , err
}
return collectionInfo . PartitionIDs [ index ] , nil
}
// splitFieldsData is to split the in-memory data(parsed from column-based files) into shards
func splitFieldsData ( collectionInfo * CollectionInfo , fieldsData BlockData , shards [ ] ShardData , rowIDAllocator * allocator . IDAllocator ) ( [ ] int64 , error ) {
if len ( fieldsData ) == 0 {
log . Warn ( "fields data to split is empty" )
return nil , fmt . Errorf ( "fields data to split is empty" )
}
if len ( shards ) != int ( collectionInfo . ShardNum ) {
log . Warn ( "block count is not equal to collection shard number" , zap . Int ( "shardsLen" , len ( shards ) ) ,
zap . Int32 ( "shardNum" , collectionInfo . ShardNum ) )
return nil , fmt . Errorf ( "block count %d is not equal to collection shard number %d" , len ( shards ) , collectionInfo . ShardNum )
}
rowCount , err := checkRowCount ( collectionInfo , fieldsData )
if err != nil {
return nil , err
}
// generate auto id for primary key and rowid field
rowIDBegin , rowIDEnd , err := rowIDAllocator . Alloc ( uint32 ( rowCount ) )
if err != nil {
log . Warn ( "failed to alloc row ID" , zap . Int ( "rowCount" , rowCount ) , zap . Error ( err ) )
return nil , fmt . Errorf ( "failed to alloc %d rows ID, error: %w" , rowCount , err )
}
rowIDField , ok := fieldsData [ common . RowIDField ]
if ! ok {
rowIDField = & storage . Int64FieldData {
Data : make ( [ ] int64 , 0 ) ,
}
fieldsData [ common . RowIDField ] = rowIDField
}
rowIDFieldArr := rowIDField . ( * storage . Int64FieldData )
for i := rowIDBegin ; i < rowIDEnd ; i ++ {
rowIDFieldArr . Data = append ( rowIDFieldArr . Data , i )
}
// reset the primary keys, as we know, only int64 pk can be auto-generated
primaryKey := collectionInfo . PrimaryKey
autoIDRange := make ( [ ] int64 , 0 )
if primaryKey . GetAutoID ( ) {
log . Info ( "generating auto-id" , zap . Int ( "rowCount" , rowCount ) , zap . Int64 ( "rowIDBegin" , rowIDBegin ) )
if primaryKey . GetDataType ( ) != schemapb . DataType_Int64 {
log . Warn ( "primary key field is auto-generated but the field type is not int64" )
return nil , fmt . Errorf ( "primary key field is auto-generated but the field type is not int64" )
}
primaryDataArr := & storage . Int64FieldData {
Data : make ( [ ] int64 , 0 , rowCount ) ,
}
for i := rowIDBegin ; i < rowIDEnd ; i ++ {
primaryDataArr . Data = append ( primaryDataArr . Data , i )
}
fieldsData [ primaryKey . GetFieldID ( ) ] = primaryDataArr
autoIDRange = append ( autoIDRange , rowIDBegin , rowIDEnd )
}
// if the primary key is not auto-gernerate and user doesn't provide, return error
primaryData , ok := fieldsData [ primaryKey . GetFieldID ( ) ]
if ! ok || primaryData . RowNum ( ) <= 0 {
log . Warn ( "primary key field is not provided" , zap . String ( "keyName" , primaryKey . GetName ( ) ) )
return nil , fmt . Errorf ( "primary key '%s' field data is not provided" , primaryKey . GetName ( ) )
}
// prepare append functions
appendFunctions , err := prepareAppendFunctions ( collectionInfo )
if err != nil {
return nil , err
}
// split data into shards
for i := 0 ; i < rowCount ; i ++ {
// hash to a shard number and partition
pk := primaryData . GetRow ( i )
shard , err := pkToShard ( pk , uint32 ( collectionInfo . ShardNum ) )
if err != nil {
return nil , err
}
partitionID , err := hashToPartition ( collectionInfo , fieldsData , i )
if err != nil {
return nil , err
}
// set rowID field
rowIDField := shards [ shard ] [ partitionID ] [ common . RowIDField ] . ( * storage . Int64FieldData )
rowIDField . Data = append ( rowIDField . Data , rowIDFieldArr . GetRow ( i ) . ( int64 ) )
// append row to shard
for k := 0 ; k < len ( collectionInfo . Schema . Fields ) ; k ++ {
schema := collectionInfo . Schema . Fields [ k ]
srcData := fieldsData [ schema . GetFieldID ( ) ]
targetData := shards [ shard ] [ partitionID ] [ schema . GetFieldID ( ) ]
if srcData == nil && schema . GetIsDynamic ( ) {
// user might not provide numpy file for dynamic field, skip it, will auto-generate later
continue
}
if srcData == nil || targetData == nil {
log . Warn ( "cannot append data since source or target field data is nil" ,
zap . String ( "FieldName" , schema . GetName ( ) ) ,
zap . Bool ( "sourceNil" , srcData == nil ) , zap . Bool ( "targetNil" , targetData == nil ) )
return nil , fmt . Errorf ( "cannot append data for field '%s', possibly no any fields corresponding to this numpy file, or a required numpy file is not provided" ,
schema . GetName ( ) )
}
appendFunc := appendFunctions [ schema . GetName ( ) ]
err := appendFunc ( srcData , i , targetData )
if err != nil {
return nil , err
}
}
}
return autoIDRange , nil
}