mirror of https://github.com/milvus-io/milvus.git
				
				
				
			
		
			
				
	
	
		
			340 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			340 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
// Licensed to the LF AI & Data foundation under one
 | 
						|
// or more contributor license agreements. See the NOTICE file
 | 
						|
// distributed with this work for additional information
 | 
						|
// regarding copyright ownership. The ASF licenses this file
 | 
						|
// to you under the Apache License, Version 2.0 (the
 | 
						|
// "License"); you may not use this file except in compliance
 | 
						|
// with the License. You may obtain a copy of the License at
 | 
						|
//
 | 
						|
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
//
 | 
						|
// Unless required by applicable law or agreed to in writing, software
 | 
						|
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
// See the License for the specific language governing permissions and
 | 
						|
// limitations under the License.
 | 
						|
 | 
						|
package client
 | 
						|
 | 
						|
import (
 | 
						|
	"encoding/json"
 | 
						|
	"fmt"
 | 
						|
	"strings"
 | 
						|
 | 
						|
	"github.com/cockroachdb/errors"
 | 
						|
	"github.com/samber/lo"
 | 
						|
 | 
						|
	"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
 | 
						|
	"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
 | 
						|
	"github.com/milvus-io/milvus/client/v2/column"
 | 
						|
	"github.com/milvus-io/milvus/client/v2/entity"
 | 
						|
	"github.com/milvus-io/milvus/client/v2/row"
 | 
						|
)
 | 
						|
 | 
						|
type InsertOption interface {
 | 
						|
	InsertRequest(coll *entity.Collection) (*milvuspb.InsertRequest, error)
 | 
						|
	CollectionName() string
 | 
						|
}
 | 
						|
 | 
						|
type UpsertOption interface {
 | 
						|
	UpsertRequest(coll *entity.Collection) (*milvuspb.UpsertRequest, error)
 | 
						|
	CollectionName() string
 | 
						|
}
 | 
						|
 | 
						|
var (
 | 
						|
	_ UpsertOption = (*columnBasedDataOption)(nil)
 | 
						|
	_ InsertOption = (*columnBasedDataOption)(nil)
 | 
						|
)
 | 
						|
 | 
						|
type columnBasedDataOption struct {
 | 
						|
	collName      string
 | 
						|
	partitionName string
 | 
						|
	columns       []column.Column
 | 
						|
}
 | 
						|
 | 
						|
func (opt *columnBasedDataOption) processInsertColumns(colSchema *entity.Schema, columns ...column.Column) ([]*schemapb.FieldData, int, error) {
 | 
						|
	// setup dynamic related var
 | 
						|
	isDynamic := colSchema.EnableDynamicField
 | 
						|
 | 
						|
	// check columns and field matches
 | 
						|
	var rowSize int
 | 
						|
	mNameField := make(map[string]*entity.Field)
 | 
						|
	for _, field := range colSchema.Fields {
 | 
						|
		mNameField[field.Name] = field
 | 
						|
	}
 | 
						|
	mNameColumn := make(map[string]column.Column)
 | 
						|
	var dynamicColumns []column.Column
 | 
						|
	for _, col := range columns {
 | 
						|
		_, dup := mNameColumn[col.Name()]
 | 
						|
		if dup {
 | 
						|
			return nil, 0, fmt.Errorf("duplicated column %s found", col.Name())
 | 
						|
		}
 | 
						|
		l := col.Len()
 | 
						|
		if rowSize == 0 {
 | 
						|
			rowSize = l
 | 
						|
		} else if rowSize != l {
 | 
						|
			return nil, 0, errors.New("column size not match")
 | 
						|
		}
 | 
						|
		field, has := mNameField[col.Name()]
 | 
						|
		if !has {
 | 
						|
			if !isDynamic {
 | 
						|
				return nil, 0, fmt.Errorf("field %s does not exist in collection %s", col.Name(), colSchema.CollectionName)
 | 
						|
			}
 | 
						|
			// add to dynamic column list for further processing
 | 
						|
			dynamicColumns = append(dynamicColumns, col)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		mNameColumn[col.Name()] = col
 | 
						|
		if col.Type() != field.DataType {
 | 
						|
			return nil, 0, fmt.Errorf("param column %s has type %v but collection field definition is %v", col.Name(), col.Type(), field.DataType)
 | 
						|
		}
 | 
						|
		if field.DataType == entity.FieldTypeFloatVector || field.DataType == entity.FieldTypeBinaryVector {
 | 
						|
			dim := 0
 | 
						|
			switch column := col.(type) {
 | 
						|
			case *column.ColumnFloatVector:
 | 
						|
				dim = column.Dim()
 | 
						|
			case *column.ColumnBinaryVector:
 | 
						|
				dim = column.Dim()
 | 
						|
			}
 | 
						|
			if fmt.Sprintf("%d", dim) != field.TypeParams[entity.TypeParamDim] {
 | 
						|
				return nil, 0, fmt.Errorf("params column %s vector dim %d not match collection definition, which has dim of %s", field.Name, dim, field.TypeParams[entity.TypeParamDim])
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// check all fixed field pass value
 | 
						|
	for _, field := range colSchema.Fields {
 | 
						|
		_, has := mNameColumn[field.Name]
 | 
						|
		if !has &&
 | 
						|
			!field.AutoID && !field.IsDynamic {
 | 
						|
			return nil, 0, fmt.Errorf("field %s not passed", field.Name)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	fieldsData := make([]*schemapb.FieldData, 0, len(mNameColumn)+1)
 | 
						|
	for _, fixedColumn := range mNameColumn {
 | 
						|
		fieldsData = append(fieldsData, fixedColumn.FieldData())
 | 
						|
	}
 | 
						|
	if len(dynamicColumns) > 0 {
 | 
						|
		// use empty column name here
 | 
						|
		col, err := opt.mergeDynamicColumns("", rowSize, dynamicColumns)
 | 
						|
		if err != nil {
 | 
						|
			return nil, 0, err
 | 
						|
		}
 | 
						|
		fieldsData = append(fieldsData, col)
 | 
						|
	}
 | 
						|
 | 
						|
	return fieldsData, rowSize, nil
 | 
						|
}
 | 
						|
 | 
						|
func (opt *columnBasedDataOption) mergeDynamicColumns(dynamicName string, rowSize int, columns []column.Column) (*schemapb.FieldData, error) {
 | 
						|
	values := make([][]byte, 0, rowSize)
 | 
						|
	for i := 0; i < rowSize; i++ {
 | 
						|
		m := make(map[string]interface{})
 | 
						|
		for _, column := range columns {
 | 
						|
			// range guaranteed
 | 
						|
			m[column.Name()], _ = column.Get(i)
 | 
						|
		}
 | 
						|
		bs, err := json.Marshal(m)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		values = append(values, bs)
 | 
						|
	}
 | 
						|
	return &schemapb.FieldData{
 | 
						|
		Type:      schemapb.DataType_JSON,
 | 
						|
		FieldName: dynamicName,
 | 
						|
		Field: &schemapb.FieldData_Scalars{
 | 
						|
			Scalars: &schemapb.ScalarField{
 | 
						|
				Data: &schemapb.ScalarField_JsonData{
 | 
						|
					JsonData: &schemapb.JSONArray{
 | 
						|
						Data: values,
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		IsDynamic: true,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (opt *columnBasedDataOption) WithColumns(columns ...column.Column) *columnBasedDataOption {
 | 
						|
	opt.columns = append(opt.columns, columns...)
 | 
						|
	return opt
 | 
						|
}
 | 
						|
 | 
						|
func (opt *columnBasedDataOption) WithBoolColumn(colName string, data []bool) *columnBasedDataOption {
 | 
						|
	column := column.NewColumnBool(colName, data)
 | 
						|
	return opt.WithColumns(column)
 | 
						|
}
 | 
						|
 | 
						|
func (opt *columnBasedDataOption) WithInt8Column(colName string, data []int8) *columnBasedDataOption {
 | 
						|
	column := column.NewColumnInt8(colName, data)
 | 
						|
	return opt.WithColumns(column)
 | 
						|
}
 | 
						|
 | 
						|
func (opt *columnBasedDataOption) WithInt16Column(colName string, data []int16) *columnBasedDataOption {
 | 
						|
	column := column.NewColumnInt16(colName, data)
 | 
						|
	return opt.WithColumns(column)
 | 
						|
}
 | 
						|
 | 
						|
func (opt *columnBasedDataOption) WithInt32Column(colName string, data []int32) *columnBasedDataOption {
 | 
						|
	column := column.NewColumnInt32(colName, data)
 | 
						|
	return opt.WithColumns(column)
 | 
						|
}
 | 
						|
 | 
						|
func (opt *columnBasedDataOption) WithInt64Column(colName string, data []int64) *columnBasedDataOption {
 | 
						|
	column := column.NewColumnInt64(colName, data)
 | 
						|
	return opt.WithColumns(column)
 | 
						|
}
 | 
						|
 | 
						|
func (opt *columnBasedDataOption) WithVarcharColumn(colName string, data []string) *columnBasedDataOption {
 | 
						|
	column := column.NewColumnVarChar(colName, data)
 | 
						|
	return opt.WithColumns(column)
 | 
						|
}
 | 
						|
 | 
						|
func (opt *columnBasedDataOption) WithFloatVectorColumn(colName string, dim int, data [][]float32) *columnBasedDataOption {
 | 
						|
	column := column.NewColumnFloatVector(colName, dim, data)
 | 
						|
	return opt.WithColumns(column)
 | 
						|
}
 | 
						|
 | 
						|
func (opt *columnBasedDataOption) WithBinaryVectorColumn(colName string, dim int, data [][]byte) *columnBasedDataOption {
 | 
						|
	column := column.NewColumnBinaryVector(colName, dim, data)
 | 
						|
	return opt.WithColumns(column)
 | 
						|
}
 | 
						|
 | 
						|
func (opt *columnBasedDataOption) WithPartition(partitionName string) *columnBasedDataOption {
 | 
						|
	opt.partitionName = partitionName
 | 
						|
	return opt
 | 
						|
}
 | 
						|
 | 
						|
func (opt *columnBasedDataOption) CollectionName() string {
 | 
						|
	return opt.collName
 | 
						|
}
 | 
						|
 | 
						|
func (opt *columnBasedDataOption) InsertRequest(coll *entity.Collection) (*milvuspb.InsertRequest, error) {
 | 
						|
	fieldsData, rowNum, err := opt.processInsertColumns(coll.Schema, opt.columns...)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return &milvuspb.InsertRequest{
 | 
						|
		CollectionName: opt.collName,
 | 
						|
		PartitionName:  opt.partitionName,
 | 
						|
		FieldsData:     fieldsData,
 | 
						|
		NumRows:        uint32(rowNum),
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (opt *columnBasedDataOption) UpsertRequest(coll *entity.Collection) (*milvuspb.UpsertRequest, error) {
 | 
						|
	fieldsData, rowNum, err := opt.processInsertColumns(coll.Schema, opt.columns...)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return &milvuspb.UpsertRequest{
 | 
						|
		CollectionName: opt.collName,
 | 
						|
		PartitionName:  opt.partitionName,
 | 
						|
		FieldsData:     fieldsData,
 | 
						|
		NumRows:        uint32(rowNum),
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func NewColumnBasedInsertOption(collName string, columns ...column.Column) *columnBasedDataOption {
 | 
						|
	return &columnBasedDataOption{
 | 
						|
		columns:  columns,
 | 
						|
		collName: collName,
 | 
						|
		// leave partition name empty, using default partition
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
type rowBasedDataOption struct {
 | 
						|
	*columnBasedDataOption
 | 
						|
	rows []any
 | 
						|
}
 | 
						|
 | 
						|
func NewRowBasedInsertOption(collName string, rows ...any) *rowBasedDataOption {
 | 
						|
	return &rowBasedDataOption{
 | 
						|
		columnBasedDataOption: &columnBasedDataOption{
 | 
						|
			collName: collName,
 | 
						|
		},
 | 
						|
		rows: rows,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (opt *rowBasedDataOption) InsertRequest(coll *entity.Collection) (*milvuspb.InsertRequest, error) {
 | 
						|
	columns, err := row.AnyToColumns(opt.rows, coll.Schema)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	opt.columnBasedDataOption.columns = columns
 | 
						|
	fieldsData, rowNum, err := opt.processInsertColumns(coll.Schema, opt.columns...)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return &milvuspb.InsertRequest{
 | 
						|
		CollectionName: opt.collName,
 | 
						|
		PartitionName:  opt.partitionName,
 | 
						|
		FieldsData:     fieldsData,
 | 
						|
		NumRows:        uint32(rowNum),
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (opt *rowBasedDataOption) UpsertRequest(coll *entity.Collection) (*milvuspb.UpsertRequest, error) {
 | 
						|
	columns, err := row.AnyToColumns(opt.rows, coll.Schema)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	opt.columnBasedDataOption.columns = columns
 | 
						|
	fieldsData, rowNum, err := opt.processInsertColumns(coll.Schema, opt.columns...)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return &milvuspb.UpsertRequest{
 | 
						|
		CollectionName: opt.collName,
 | 
						|
		PartitionName:  opt.partitionName,
 | 
						|
		FieldsData:     fieldsData,
 | 
						|
		NumRows:        uint32(rowNum),
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
type DeleteOption interface {
 | 
						|
	Request() *milvuspb.DeleteRequest
 | 
						|
}
 | 
						|
 | 
						|
type deleteOption struct {
 | 
						|
	collectionName string
 | 
						|
	partitionName  string
 | 
						|
	expr           string
 | 
						|
}
 | 
						|
 | 
						|
func (opt *deleteOption) Request() *milvuspb.DeleteRequest {
 | 
						|
	return &milvuspb.DeleteRequest{
 | 
						|
		CollectionName: opt.collectionName,
 | 
						|
		PartitionName:  opt.partitionName,
 | 
						|
		Expr:           opt.expr,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (opt *deleteOption) WithExpr(expr string) *deleteOption {
 | 
						|
	opt.expr = expr
 | 
						|
	return opt
 | 
						|
}
 | 
						|
 | 
						|
func (opt *deleteOption) WithInt64IDs(fieldName string, ids []int64) *deleteOption {
 | 
						|
	opt.expr = fmt.Sprintf("%s in %s", fieldName, strings.Join(strings.Fields(fmt.Sprint(ids)), ","))
 | 
						|
	return opt
 | 
						|
}
 | 
						|
 | 
						|
func (opt *deleteOption) WithStringIDs(fieldName string, ids []string) *deleteOption {
 | 
						|
	opt.expr = fmt.Sprintf("%s in [%s]", fieldName, strings.Join(lo.Map(ids, func(id string, _ int) string { return fmt.Sprintf("\"%s\"", id) }), ","))
 | 
						|
	return opt
 | 
						|
}
 | 
						|
 | 
						|
func (opt *deleteOption) WithPartition(partitionName string) *deleteOption {
 | 
						|
	opt.partitionName = partitionName
 | 
						|
	return opt
 | 
						|
}
 | 
						|
 | 
						|
func NewDeleteOption(collectionName string) *deleteOption {
 | 
						|
	return &deleteOption{collectionName: collectionName}
 | 
						|
}
 |