2020-12-09 12:07:27 +00:00
|
|
|
package storage
|
|
|
|
|
|
|
|
import (
|
2021-01-28 09:25:43 +00:00
|
|
|
"encoding/json"
|
2020-12-09 12:07:27 +00:00
|
|
|
"fmt"
|
2020-12-23 03:34:35 +00:00
|
|
|
"sort"
|
|
|
|
"strconv"
|
|
|
|
"strings"
|
2020-12-09 12:07:27 +00:00
|
|
|
|
2020-12-11 10:14:19 +00:00
|
|
|
"github.com/zilliztech/milvus-distributed/internal/errors"
|
2021-01-30 08:41:47 +00:00
|
|
|
ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
|
2020-12-09 12:07:27 +00:00
|
|
|
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
|
|
|
|
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
|
|
|
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
|
|
|
)
|
|
|
|
|
2020-12-23 10:06:04 +00:00
|
|
|
const (
|
2021-01-28 09:25:43 +00:00
|
|
|
Ts = "ts"
|
|
|
|
DDL = "ddl"
|
2021-01-29 07:22:24 +00:00
|
|
|
IndexParamsFile = "indexParams"
|
2020-12-23 10:06:04 +00:00
|
|
|
)
|
|
|
|
|
2020-12-09 12:07:27 +00:00
|
|
|
type (
|
|
|
|
UniqueID = typeutil.UniqueID
|
2020-12-11 04:01:20 +00:00
|
|
|
FieldID = typeutil.UniqueID
|
2020-12-09 12:07:27 +00:00
|
|
|
Timestamp = typeutil.Timestamp
|
|
|
|
)
|
|
|
|
|
|
|
|
type Blob struct {
|
2020-12-23 03:34:35 +00:00
|
|
|
Key string
|
|
|
|
Value []byte
|
|
|
|
}
|
|
|
|
|
|
|
|
type BlobList []*Blob
|
|
|
|
|
|
|
|
func (s BlobList) Len() int {
|
|
|
|
return len(s)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s BlobList) Less(i, j int) bool {
|
|
|
|
leftValues := strings.Split(s[i].Key, "/")
|
|
|
|
rightValues := strings.Split(s[j].Key, "/")
|
|
|
|
left, _ := strconv.ParseInt(leftValues[len(leftValues)-1], 0, 10)
|
|
|
|
right, _ := strconv.ParseInt(rightValues[len(rightValues)-1], 0, 10)
|
|
|
|
return left < right
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s BlobList) Swap(i, j int) {
|
|
|
|
s[i], s[j] = s[j], s[i]
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
|
2020-12-22 00:14:36 +00:00
|
|
|
func NewBlob(key string, value []byte) *Blob {
|
|
|
|
return &Blob{key, value}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b Blob) GetKey() string {
|
2020-12-23 03:34:35 +00:00
|
|
|
return b.Key
|
2020-12-22 00:14:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (b Blob) GetValue() []byte {
|
2020-12-23 03:34:35 +00:00
|
|
|
return b.Value
|
2020-12-22 00:14:36 +00:00
|
|
|
}
|
|
|
|
|
2020-12-09 12:07:27 +00:00
|
|
|
type FieldData interface{}
|
|
|
|
|
|
|
|
type BoolFieldData struct {
|
|
|
|
NumRows int
|
2020-12-21 08:27:03 +00:00
|
|
|
Data []bool
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
type Int8FieldData struct {
|
|
|
|
NumRows int
|
2020-12-21 08:27:03 +00:00
|
|
|
Data []int8
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
type Int16FieldData struct {
|
|
|
|
NumRows int
|
2020-12-21 08:27:03 +00:00
|
|
|
Data []int16
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
type Int32FieldData struct {
|
|
|
|
NumRows int
|
2020-12-21 08:27:03 +00:00
|
|
|
Data []int32
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
type Int64FieldData struct {
|
|
|
|
NumRows int
|
2020-12-21 08:27:03 +00:00
|
|
|
Data []int64
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
type FloatFieldData struct {
|
|
|
|
NumRows int
|
2020-12-21 08:27:03 +00:00
|
|
|
Data []float32
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
type DoubleFieldData struct {
|
|
|
|
NumRows int
|
2020-12-21 08:27:03 +00:00
|
|
|
Data []float64
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
type StringFieldData struct {
|
|
|
|
NumRows int
|
2020-12-21 08:27:03 +00:00
|
|
|
Data []string
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
type BinaryVectorFieldData struct {
|
|
|
|
NumRows int
|
2020-12-21 08:27:03 +00:00
|
|
|
Data []byte
|
|
|
|
Dim int
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
type FloatVectorFieldData struct {
|
|
|
|
NumRows int
|
2020-12-21 08:27:03 +00:00
|
|
|
Data []float32
|
|
|
|
Dim int
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// system filed id:
|
|
|
|
// 0: unique row id
|
|
|
|
// 1: timestamp
|
|
|
|
// 100: first user field id
|
|
|
|
// 101: second user field id
|
|
|
|
// 102: ...
|
|
|
|
|
|
|
|
// example row_schema: {float_field, int_field, float_vector_field, string_field}
|
|
|
|
// Data {<0, row_id>, <1, timestamp>, <100, float_field>, <101, int_field>, <102, float_vector_field>, <103, string_field>}
|
|
|
|
type InsertData struct {
|
2020-12-11 04:01:20 +00:00
|
|
|
Data map[FieldID]FieldData // field id to field data
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Blob key example:
|
2020-12-18 07:21:25 +00:00
|
|
|
// ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx}
|
2020-12-09 12:07:27 +00:00
|
|
|
type InsertCodec struct {
|
2020-12-23 10:06:04 +00:00
|
|
|
Schema *etcdpb.CollectionMeta
|
2020-12-09 12:07:27 +00:00
|
|
|
readerCloseFunc []func() error
|
|
|
|
}
|
|
|
|
|
2020-12-23 10:06:04 +00:00
|
|
|
func NewInsertCodec(schema *etcdpb.CollectionMeta) *InsertCodec {
|
|
|
|
return &InsertCodec{Schema: schema}
|
|
|
|
}
|
|
|
|
|
2020-12-11 03:29:07 +00:00
|
|
|
func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, error) {
|
2020-12-09 12:07:27 +00:00
|
|
|
var blobs []*Blob
|
|
|
|
var writer *InsertBinlogWriter
|
|
|
|
var err error
|
2020-12-18 07:21:25 +00:00
|
|
|
timeFieldData, ok := data.Data[ms.TimeStampField]
|
|
|
|
if !ok {
|
|
|
|
return nil, errors.New("data doesn't contains timestamp field")
|
|
|
|
}
|
2020-12-23 03:34:35 +00:00
|
|
|
ts := timeFieldData.(*Int64FieldData).Data
|
2020-12-09 12:07:27 +00:00
|
|
|
|
2020-12-11 03:29:07 +00:00
|
|
|
for _, field := range insertCodec.Schema.Schema.Fields {
|
|
|
|
singleData := data.Data[field.FieldID]
|
|
|
|
writer, err = NewInsertBinlogWriter(field.DataType, insertCodec.Schema.ID, partitionID, segmentID, field.FieldID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
eventWriter, err := writer.NextInsertEventWriter()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0]))
|
|
|
|
eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1]))
|
|
|
|
switch field.DataType {
|
|
|
|
case schemapb.DataType_BOOL:
|
2020-12-23 03:34:35 +00:00
|
|
|
err = eventWriter.AddBoolToPayload(singleData.(*BoolFieldData).Data)
|
2020-12-11 03:29:07 +00:00
|
|
|
case schemapb.DataType_INT8:
|
2020-12-23 03:34:35 +00:00
|
|
|
err = eventWriter.AddInt8ToPayload(singleData.(*Int8FieldData).Data)
|
2020-12-11 03:29:07 +00:00
|
|
|
case schemapb.DataType_INT16:
|
2020-12-23 03:34:35 +00:00
|
|
|
err = eventWriter.AddInt16ToPayload(singleData.(*Int16FieldData).Data)
|
2020-12-11 03:29:07 +00:00
|
|
|
case schemapb.DataType_INT32:
|
2020-12-23 03:34:35 +00:00
|
|
|
err = eventWriter.AddInt32ToPayload(singleData.(*Int32FieldData).Data)
|
2020-12-11 03:29:07 +00:00
|
|
|
case schemapb.DataType_INT64:
|
2020-12-23 03:34:35 +00:00
|
|
|
err = eventWriter.AddInt64ToPayload(singleData.(*Int64FieldData).Data)
|
2020-12-11 03:29:07 +00:00
|
|
|
case schemapb.DataType_FLOAT:
|
2020-12-23 03:34:35 +00:00
|
|
|
err = eventWriter.AddFloatToPayload(singleData.(*FloatFieldData).Data)
|
2020-12-11 03:29:07 +00:00
|
|
|
case schemapb.DataType_DOUBLE:
|
2020-12-23 03:34:35 +00:00
|
|
|
err = eventWriter.AddDoubleToPayload(singleData.(*DoubleFieldData).Data)
|
2020-12-11 03:29:07 +00:00
|
|
|
case schemapb.DataType_STRING:
|
2020-12-23 03:34:35 +00:00
|
|
|
for _, singleString := range singleData.(*StringFieldData).Data {
|
2020-12-09 12:07:27 +00:00
|
|
|
err = eventWriter.AddOneStringToPayload(singleString)
|
2020-12-11 03:29:07 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
2020-12-11 03:29:07 +00:00
|
|
|
case schemapb.DataType_VECTOR_BINARY:
|
2020-12-23 03:34:35 +00:00
|
|
|
err = eventWriter.AddBinaryVectorToPayload(singleData.(*BinaryVectorFieldData).Data, singleData.(*BinaryVectorFieldData).Dim)
|
2020-12-11 03:29:07 +00:00
|
|
|
case schemapb.DataType_VECTOR_FLOAT:
|
2020-12-23 03:34:35 +00:00
|
|
|
err = eventWriter.AddFloatVectorToPayload(singleData.(*FloatVectorFieldData).Data, singleData.(*FloatVectorFieldData).Dim)
|
2020-12-11 10:14:19 +00:00
|
|
|
default:
|
|
|
|
return nil, errors.Errorf("undefined data type %d", field.DataType)
|
2020-12-11 03:29:07 +00:00
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
if writer == nil {
|
2020-12-18 07:21:25 +00:00
|
|
|
return nil, errors.New("binlog writer is nil")
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
2020-12-10 01:57:14 +00:00
|
|
|
writer.SetStartTimeStamp(typeutil.Timestamp(ts[0]))
|
|
|
|
writer.SetEndTimeStamp(typeutil.Timestamp(ts[len(ts)-1]))
|
2020-12-09 12:07:27 +00:00
|
|
|
|
2020-12-11 03:29:07 +00:00
|
|
|
err = writer.Close()
|
2020-12-09 12:07:27 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2020-12-10 07:50:09 +00:00
|
|
|
buffer, err := writer.GetBuffer()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-12-18 07:21:25 +00:00
|
|
|
blobKey := fmt.Sprintf("%d", field.FieldID)
|
2020-12-09 12:07:27 +00:00
|
|
|
blobs = append(blobs, &Blob{
|
2020-12-23 03:34:35 +00:00
|
|
|
Key: blobKey,
|
|
|
|
Value: buffer,
|
2020-12-09 12:07:27 +00:00
|
|
|
})
|
|
|
|
|
|
|
|
}
|
|
|
|
return blobs, nil
|
|
|
|
|
|
|
|
}
|
|
|
|
func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *InsertData, err error) {
|
|
|
|
if len(blobs) == 0 {
|
2020-12-18 07:21:25 +00:00
|
|
|
return -1, -1, nil, errors.New("blobs is empty")
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
readerClose := func(reader *BinlogReader) func() error {
|
|
|
|
return func() error { return reader.Close() }
|
|
|
|
}
|
2020-12-23 03:34:35 +00:00
|
|
|
|
|
|
|
var blobList BlobList = blobs
|
|
|
|
sort.Sort(blobList)
|
|
|
|
|
2020-12-11 03:29:07 +00:00
|
|
|
var pID UniqueID
|
|
|
|
var sID UniqueID
|
2020-12-23 03:34:35 +00:00
|
|
|
resultData := &InsertData{}
|
2020-12-11 04:01:20 +00:00
|
|
|
resultData.Data = make(map[FieldID]FieldData)
|
2020-12-23 03:34:35 +00:00
|
|
|
for _, blob := range blobList {
|
|
|
|
binlogReader, err := NewBinlogReader(blob.Value)
|
2020-12-09 12:07:27 +00:00
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
|
2020-12-11 03:29:07 +00:00
|
|
|
// read partitionID and SegmentID
|
|
|
|
pID, sID = binlogReader.PartitionID, binlogReader.SegmentID
|
|
|
|
|
|
|
|
dataType := binlogReader.PayloadDataType
|
|
|
|
fieldID := binlogReader.FieldID
|
2020-12-23 03:34:35 +00:00
|
|
|
for {
|
2020-12-09 12:07:27 +00:00
|
|
|
eventReader, err := binlogReader.NextEventReader()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
2020-12-23 03:34:35 +00:00
|
|
|
if eventReader == nil {
|
|
|
|
break
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
2020-12-23 03:34:35 +00:00
|
|
|
switch dataType {
|
|
|
|
case schemapb.DataType_BOOL:
|
|
|
|
if resultData.Data[fieldID] == nil {
|
|
|
|
resultData.Data[fieldID] = &BoolFieldData{}
|
|
|
|
}
|
|
|
|
boolFieldData := resultData.Data[fieldID].(*BoolFieldData)
|
|
|
|
singleData, err := eventReader.GetBoolFromPayload()
|
2020-12-09 12:07:27 +00:00
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
2020-12-23 03:34:35 +00:00
|
|
|
boolFieldData.Data = append(boolFieldData.Data, singleData...)
|
|
|
|
length, err := eventReader.GetPayloadLengthFromReader()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
boolFieldData.NumRows += length
|
|
|
|
resultData.Data[fieldID] = boolFieldData
|
|
|
|
case schemapb.DataType_INT8:
|
|
|
|
if resultData.Data[fieldID] == nil {
|
|
|
|
resultData.Data[fieldID] = &Int8FieldData{}
|
|
|
|
}
|
|
|
|
int8FieldData := resultData.Data[fieldID].(*Int8FieldData)
|
|
|
|
singleData, err := eventReader.GetInt8FromPayload()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
int8FieldData.Data = append(int8FieldData.Data, singleData...)
|
|
|
|
length, err := eventReader.GetPayloadLengthFromReader()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
int8FieldData.NumRows += length
|
|
|
|
resultData.Data[fieldID] = int8FieldData
|
|
|
|
case schemapb.DataType_INT16:
|
|
|
|
if resultData.Data[fieldID] == nil {
|
|
|
|
resultData.Data[fieldID] = &Int16FieldData{}
|
|
|
|
}
|
|
|
|
int16FieldData := resultData.Data[fieldID].(*Int16FieldData)
|
|
|
|
singleData, err := eventReader.GetInt16FromPayload()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
int16FieldData.Data = append(int16FieldData.Data, singleData...)
|
|
|
|
length, err := eventReader.GetPayloadLengthFromReader()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
int16FieldData.NumRows += length
|
|
|
|
resultData.Data[fieldID] = int16FieldData
|
|
|
|
case schemapb.DataType_INT32:
|
|
|
|
if resultData.Data[fieldID] == nil {
|
|
|
|
resultData.Data[fieldID] = &Int32FieldData{}
|
|
|
|
}
|
|
|
|
int32FieldData := resultData.Data[fieldID].(*Int32FieldData)
|
|
|
|
singleData, err := eventReader.GetInt32FromPayload()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
int32FieldData.Data = append(int32FieldData.Data, singleData...)
|
|
|
|
length, err := eventReader.GetPayloadLengthFromReader()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
int32FieldData.NumRows += length
|
|
|
|
resultData.Data[fieldID] = int32FieldData
|
|
|
|
case schemapb.DataType_INT64:
|
|
|
|
if resultData.Data[fieldID] == nil {
|
|
|
|
resultData.Data[fieldID] = &Int64FieldData{}
|
|
|
|
}
|
|
|
|
int64FieldData := resultData.Data[fieldID].(*Int64FieldData)
|
|
|
|
singleData, err := eventReader.GetInt64FromPayload()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
int64FieldData.Data = append(int64FieldData.Data, singleData...)
|
|
|
|
length, err := eventReader.GetPayloadLengthFromReader()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
int64FieldData.NumRows += length
|
|
|
|
resultData.Data[fieldID] = int64FieldData
|
|
|
|
case schemapb.DataType_FLOAT:
|
|
|
|
if resultData.Data[fieldID] == nil {
|
|
|
|
resultData.Data[fieldID] = &FloatFieldData{}
|
|
|
|
}
|
|
|
|
floatFieldData := resultData.Data[fieldID].(*FloatFieldData)
|
|
|
|
singleData, err := eventReader.GetFloatFromPayload()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
floatFieldData.Data = append(floatFieldData.Data, singleData...)
|
|
|
|
length, err := eventReader.GetPayloadLengthFromReader()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
floatFieldData.NumRows += length
|
|
|
|
resultData.Data[fieldID] = floatFieldData
|
|
|
|
case schemapb.DataType_DOUBLE:
|
|
|
|
if resultData.Data[fieldID] == nil {
|
|
|
|
resultData.Data[fieldID] = &DoubleFieldData{}
|
|
|
|
}
|
|
|
|
doubleFieldData := resultData.Data[fieldID].(*DoubleFieldData)
|
|
|
|
singleData, err := eventReader.GetDoubleFromPayload()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
doubleFieldData.Data = append(doubleFieldData.Data, singleData...)
|
|
|
|
length, err := eventReader.GetPayloadLengthFromReader()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
doubleFieldData.NumRows += length
|
|
|
|
resultData.Data[fieldID] = doubleFieldData
|
|
|
|
case schemapb.DataType_STRING:
|
|
|
|
if resultData.Data[fieldID] == nil {
|
|
|
|
resultData.Data[fieldID] = &StringFieldData{}
|
|
|
|
}
|
|
|
|
stringFieldData := resultData.Data[fieldID].(*StringFieldData)
|
|
|
|
length, err := eventReader.GetPayloadLengthFromReader()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
stringFieldData.NumRows += length
|
|
|
|
for i := 0; i < length; i++ {
|
|
|
|
singleString, err := eventReader.GetOneStringFromPayload(i)
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
stringFieldData.Data = append(stringFieldData.Data, singleString)
|
|
|
|
}
|
|
|
|
resultData.Data[fieldID] = stringFieldData
|
|
|
|
case schemapb.DataType_VECTOR_BINARY:
|
|
|
|
if resultData.Data[fieldID] == nil {
|
|
|
|
resultData.Data[fieldID] = &BinaryVectorFieldData{}
|
|
|
|
}
|
|
|
|
binaryVectorFieldData := resultData.Data[fieldID].(*BinaryVectorFieldData)
|
|
|
|
var singleData []byte
|
|
|
|
singleData, binaryVectorFieldData.Dim, err = eventReader.GetBinaryVectorFromPayload()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
binaryVectorFieldData.Data = append(binaryVectorFieldData.Data, singleData...)
|
|
|
|
length, err := eventReader.GetPayloadLengthFromReader()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
binaryVectorFieldData.NumRows += length
|
|
|
|
resultData.Data[fieldID] = binaryVectorFieldData
|
|
|
|
case schemapb.DataType_VECTOR_FLOAT:
|
|
|
|
if resultData.Data[fieldID] == nil {
|
|
|
|
resultData.Data[fieldID] = &FloatVectorFieldData{}
|
|
|
|
}
|
|
|
|
floatVectorFieldData := resultData.Data[fieldID].(*FloatVectorFieldData)
|
|
|
|
var singleData []float32
|
|
|
|
singleData, floatVectorFieldData.Dim, err = eventReader.GetFloatVectorFromPayload()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
floatVectorFieldData.Data = append(floatVectorFieldData.Data, singleData...)
|
|
|
|
length, err := eventReader.GetPayloadLengthFromReader()
|
|
|
|
if err != nil {
|
|
|
|
return -1, -1, nil, err
|
|
|
|
}
|
|
|
|
floatVectorFieldData.NumRows += length
|
|
|
|
resultData.Data[fieldID] = floatVectorFieldData
|
|
|
|
default:
|
|
|
|
return -1, -1, nil, errors.Errorf("undefined data type %d", dataType)
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
}
|
2020-12-23 03:34:35 +00:00
|
|
|
insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
|
2020-12-23 03:34:35 +00:00
|
|
|
return pID, sID, resultData, nil
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (insertCodec *InsertCodec) Close() error {
|
|
|
|
for _, closeFunc := range insertCodec.readerCloseFunc {
|
|
|
|
err := closeFunc()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Blob key example:
|
2020-12-23 10:06:04 +00:00
|
|
|
// ${tenant}/data_definition_log/${collection_id}/ts/${log_idx}
|
|
|
|
// ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx}
|
2020-12-09 12:07:27 +00:00
|
|
|
type DataDefinitionCodec struct {
|
2020-12-23 10:06:04 +00:00
|
|
|
collectionID int64
|
2020-12-09 12:07:27 +00:00
|
|
|
readerCloseFunc []func() error
|
|
|
|
}
|
|
|
|
|
2020-12-23 10:06:04 +00:00
|
|
|
func NewDataDefinitionCodec(collectionID int64) *DataDefinitionCodec {
|
|
|
|
return &DataDefinitionCodec{collectionID: collectionID}
|
|
|
|
}
|
|
|
|
|
2020-12-11 03:29:07 +00:00
|
|
|
func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) {
|
2020-12-23 10:06:04 +00:00
|
|
|
writer, err := NewDDLBinlogWriter(schemapb.DataType_INT64, dataDefinitionCodec.collectionID)
|
2020-12-09 12:07:27 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
var blobs []*Blob
|
|
|
|
|
2020-12-23 10:06:04 +00:00
|
|
|
eventWriter, err := writer.NextCreateCollectionEventWriter()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
var int64Ts []int64
|
|
|
|
for _, singleTs := range ts {
|
|
|
|
int64Ts = append(int64Ts, int64(singleTs))
|
|
|
|
}
|
|
|
|
err = eventWriter.AddInt64ToPayload(int64Ts)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
eventWriter.SetStartTimestamp(ts[0])
|
|
|
|
eventWriter.SetEndTimestamp(ts[len(ts)-1])
|
|
|
|
writer.SetStartTimeStamp(ts[0])
|
|
|
|
writer.SetEndTimeStamp(ts[len(ts)-1])
|
|
|
|
err = writer.Close()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
buffer, err := writer.GetBuffer()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
blobs = append(blobs, &Blob{
|
|
|
|
Key: Ts,
|
|
|
|
Value: buffer,
|
|
|
|
})
|
|
|
|
|
|
|
|
writer, err = NewDDLBinlogWriter(schemapb.DataType_STRING, dataDefinitionCodec.collectionID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2020-12-09 12:07:27 +00:00
|
|
|
for pos, req := range ddRequests {
|
|
|
|
switch eventTypes[pos] {
|
|
|
|
case CreateCollectionEventType:
|
|
|
|
eventWriter, err := writer.NextCreateCollectionEventWriter()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
err = eventWriter.AddOneStringToPayload(req)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
eventWriter.SetStartTimestamp(ts[pos])
|
|
|
|
eventWriter.SetEndTimestamp(ts[pos])
|
|
|
|
case DropCollectionEventType:
|
|
|
|
eventWriter, err := writer.NextDropCollectionEventWriter()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
err = eventWriter.AddOneStringToPayload(req)
|
|
|
|
eventWriter.SetStartTimestamp(ts[pos])
|
|
|
|
eventWriter.SetEndTimestamp(ts[pos])
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
case CreatePartitionEventType:
|
|
|
|
eventWriter, err := writer.NextCreatePartitionEventWriter()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
err = eventWriter.AddOneStringToPayload(req)
|
|
|
|
eventWriter.SetStartTimestamp(ts[pos])
|
|
|
|
eventWriter.SetEndTimestamp(ts[pos])
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
case DropPartitionEventType:
|
|
|
|
eventWriter, err := writer.NextDropPartitionEventWriter()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
err = eventWriter.AddOneStringToPayload(req)
|
|
|
|
eventWriter.SetStartTimestamp(ts[pos])
|
|
|
|
eventWriter.SetEndTimestamp(ts[pos])
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-12-11 03:29:07 +00:00
|
|
|
writer.SetStartTimeStamp(ts[0])
|
|
|
|
writer.SetEndTimeStamp(ts[len(ts)-1])
|
2020-12-09 12:07:27 +00:00
|
|
|
err = writer.Close()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-12-10 07:50:09 +00:00
|
|
|
buffer, err = writer.GetBuffer()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-12-09 12:07:27 +00:00
|
|
|
blobs = append(blobs, &Blob{
|
2020-12-23 10:06:04 +00:00
|
|
|
Key: DDL,
|
2020-12-23 03:34:35 +00:00
|
|
|
Value: buffer,
|
2020-12-09 12:07:27 +00:00
|
|
|
})
|
|
|
|
|
|
|
|
return blobs, nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2020-12-10 01:57:14 +00:00
|
|
|
func (dataDefinitionCodec *DataDefinitionCodec) Deserialize(blobs []*Blob) (ts []Timestamp, ddRequests []string, err error) {
|
2020-12-09 12:07:27 +00:00
|
|
|
if len(blobs) == 0 {
|
2020-12-18 07:21:25 +00:00
|
|
|
return nil, nil, errors.New("blobs is empty")
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
readerClose := func(reader *BinlogReader) func() error {
|
|
|
|
return func() error { return reader.Close() }
|
|
|
|
}
|
|
|
|
var requestsStrings []string
|
|
|
|
var resultTs []Timestamp
|
2020-12-23 03:34:35 +00:00
|
|
|
|
|
|
|
var blobList BlobList = blobs
|
|
|
|
sort.Sort(blobList)
|
|
|
|
|
|
|
|
for _, blob := range blobList {
|
|
|
|
binlogReader, err := NewBinlogReader(blob.Value)
|
2020-12-09 12:07:27 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, nil, err
|
|
|
|
}
|
2020-12-18 07:21:25 +00:00
|
|
|
dataType := binlogReader.PayloadDataType
|
2020-12-11 03:29:07 +00:00
|
|
|
|
2020-12-23 03:34:35 +00:00
|
|
|
for {
|
2020-12-09 12:07:27 +00:00
|
|
|
eventReader, err := binlogReader.NextEventReader()
|
|
|
|
if err != nil {
|
|
|
|
return nil, nil, err
|
|
|
|
}
|
2020-12-23 03:34:35 +00:00
|
|
|
if eventReader == nil {
|
|
|
|
break
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
2020-12-23 03:34:35 +00:00
|
|
|
switch dataType {
|
|
|
|
case schemapb.DataType_INT64:
|
|
|
|
int64Ts, err := eventReader.GetInt64FromPayload()
|
|
|
|
if err != nil {
|
|
|
|
return nil, nil, err
|
|
|
|
}
|
|
|
|
for _, singleTs := range int64Ts {
|
|
|
|
resultTs = append(resultTs, Timestamp(singleTs))
|
|
|
|
}
|
|
|
|
case schemapb.DataType_STRING:
|
2020-12-09 12:07:27 +00:00
|
|
|
length, err := eventReader.GetPayloadLengthFromReader()
|
|
|
|
if err != nil {
|
|
|
|
return nil, nil, err
|
|
|
|
}
|
|
|
|
for i := 0; i < length; i++ {
|
|
|
|
singleString, err := eventReader.GetOneStringFromPayload(i)
|
|
|
|
if err != nil {
|
|
|
|
return nil, nil, err
|
|
|
|
}
|
|
|
|
requestsStrings = append(requestsStrings, singleString)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-12-23 03:34:35 +00:00
|
|
|
dataDefinitionCodec.readerCloseFunc = append(dataDefinitionCodec.readerCloseFunc, readerClose(binlogReader))
|
2020-12-09 12:07:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return resultTs, requestsStrings, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (dataDefinitionCodec *DataDefinitionCodec) Close() error {
|
|
|
|
for _, closeFunc := range dataDefinitionCodec.readerCloseFunc {
|
|
|
|
err := closeFunc()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
2020-12-11 03:29:07 +00:00
|
|
|
|
2020-12-18 07:44:27 +00:00
|
|
|
//type IndexCodec struct {
|
|
|
|
// Base
|
|
|
|
// readerCloseFunc []func() error
|
|
|
|
//}
|
|
|
|
//
|
|
|
|
////func (builder *IndexBuilder) Build(fieldData FieldData, typeParams map[string]string, indexParams map[string]string) ([]*Blob, error) {}
|
|
|
|
//func (indexCodec *IndexCodec) Serialize(indexSlices []*Blob) ([]*Blob, error) {}
|
|
|
|
//
|
|
|
|
//// TODO: describe inputs and return
|
|
|
|
//func (indexCodec *IndexCodec) Deserialize(blobs []*Blob) ([]*Blob, error) {}
|
|
|
|
|
2020-12-11 03:29:07 +00:00
|
|
|
type IndexCodec struct {
|
2020-12-23 10:06:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewIndexCodec() *IndexCodec {
|
|
|
|
return &IndexCodec{}
|
2020-12-11 03:29:07 +00:00
|
|
|
}
|
|
|
|
|
2021-02-03 03:52:19 +00:00
|
|
|
func (indexCodec *IndexCodec) Serialize(blobs []*Blob, params map[string]string, indexName string, indexID UniqueID) ([]*Blob, error) {
|
|
|
|
paramsBytes, err := json.Marshal(struct {
|
|
|
|
Params map[string]string
|
|
|
|
IndexName string
|
|
|
|
IndexID UniqueID
|
|
|
|
}{
|
|
|
|
Params: params,
|
|
|
|
IndexName: indexName,
|
|
|
|
IndexID: indexID,
|
|
|
|
})
|
2021-01-28 09:25:43 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-01-29 07:22:24 +00:00
|
|
|
blobs = append(blobs, &Blob{Key: IndexParamsFile, Value: paramsBytes})
|
2020-12-11 03:29:07 +00:00
|
|
|
return blobs, nil
|
|
|
|
}
|
|
|
|
|
2021-02-03 03:52:19 +00:00
|
|
|
func (indexCodec *IndexCodec) Deserialize(blobs []*Blob) ([]*Blob, map[string]string, string, UniqueID, error) {
|
|
|
|
var file *Blob
|
2021-01-28 09:25:43 +00:00
|
|
|
for i := 0; i < len(blobs); i++ {
|
2021-01-29 07:22:24 +00:00
|
|
|
if blobs[i].Key != IndexParamsFile {
|
2021-01-28 09:25:43 +00:00
|
|
|
continue
|
|
|
|
}
|
2021-02-03 03:52:19 +00:00
|
|
|
file = blobs[i]
|
2021-01-28 09:25:43 +00:00
|
|
|
blobs = append(blobs[:i], blobs[i+1:]...)
|
|
|
|
break
|
|
|
|
}
|
2021-02-03 03:52:19 +00:00
|
|
|
if file == nil {
|
|
|
|
return nil, nil, "", -1, errors.New("can not find params blob")
|
2021-01-28 09:25:43 +00:00
|
|
|
}
|
2021-02-03 03:52:19 +00:00
|
|
|
info := struct {
|
|
|
|
Params map[string]string
|
|
|
|
IndexName string
|
|
|
|
IndexID UniqueID
|
|
|
|
}{}
|
|
|
|
if err := json.Unmarshal(file.Value, &info); err != nil {
|
|
|
|
return nil, nil, "", -1, errors.New("json unmarshal error: " + err.Error())
|
|
|
|
}
|
|
|
|
|
|
|
|
return blobs, info.Params, info.IndexName, info.IndexID, nil
|
2020-12-11 03:29:07 +00:00
|
|
|
}
|