Refactor data codec code

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/4973/head^2
godchen 2020-12-23 15:46:16 +08:00 committed by yefu.chen
parent 0b9cffaf87
commit e52b130ba6
2 changed files with 153 additions and 204 deletions

View File

@ -13,6 +13,11 @@ import (
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
const (
Ts = "ts"
DDL = "ddl"
)
type (
UniqueID = typeutil.UniqueID
FieldID = typeutil.UniqueID
@ -54,13 +59,6 @@ func (b Blob) GetValue() []byte {
return b.Value
}
type Base struct {
Version int
CommitID int
TenantID UniqueID
Schema *etcdpb.CollectionMeta
}
type FieldData interface{}
type BoolFieldData struct {
@ -122,10 +120,14 @@ type InsertData struct {
// Blob key example:
// ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx}
type InsertCodec struct {
Base
Schema *etcdpb.CollectionMeta
readerCloseFunc []func() error
}
func NewInsertCodec(schema *etcdpb.CollectionMeta) *InsertCodec {
return &InsertCodec{Schema: schema}
}
func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, error) {
var blobs []*Blob
var writer *InsertBinlogWriter
@ -425,20 +427,59 @@ func (insertCodec *InsertCodec) Close() error {
}
// Blob key example:
// ${tenant}/data_definition_log/${collection_id}/${field_type}/${log_idx}
// ${tenant}/data_definition_log/${collection_id}/ts/${log_idx}
// ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx}
type DataDefinitionCodec struct {
Base
collectionID int64
readerCloseFunc []func() error
}
func NewDataDefinitionCodec(collectionID int64) *DataDefinitionCodec {
return &DataDefinitionCodec{collectionID: collectionID}
}
func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) {
writer, err := NewDDLBinlogWriter(schemapb.DataType_STRING, dataDefinitionCodec.Schema.ID)
writer, err := NewDDLBinlogWriter(schemapb.DataType_INT64, dataDefinitionCodec.collectionID)
if err != nil {
return nil, err
}
var blobs []*Blob
eventWriter, err := writer.NextCreateCollectionEventWriter()
if err != nil {
return nil, err
}
var int64Ts []int64
for _, singleTs := range ts {
int64Ts = append(int64Ts, int64(singleTs))
}
err = eventWriter.AddInt64ToPayload(int64Ts)
if err != nil {
return nil, err
}
eventWriter.SetStartTimestamp(ts[0])
eventWriter.SetEndTimestamp(ts[len(ts)-1])
writer.SetStartTimeStamp(ts[0])
writer.SetEndTimeStamp(ts[len(ts)-1])
err = writer.Close()
if err != nil {
return nil, err
}
buffer, err := writer.GetBuffer()
if err != nil {
return nil, err
}
blobs = append(blobs, &Blob{
Key: Ts,
Value: buffer,
})
writer, err = NewDDLBinlogWriter(schemapb.DataType_STRING, dataDefinitionCodec.collectionID)
if err != nil {
return nil, err
}
for pos, req := range ddRequests {
switch eventTypes[pos] {
case CreateCollectionEventType:
@ -493,46 +534,12 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
if err != nil {
return nil, err
}
buffer, err := writer.GetBuffer()
if err != nil {
return nil, err
}
blobs = append(blobs, &Blob{
Key: "",
Value: buffer,
})
writer, err = NewDDLBinlogWriter(schemapb.DataType_INT64, dataDefinitionCodec.Schema.ID)
if err != nil {
return nil, err
}
eventWriter, err := writer.NextCreateCollectionEventWriter()
if err != nil {
return nil, err
}
var int64Ts []int64
for _, singleTs := range ts {
int64Ts = append(int64Ts, int64(singleTs))
}
err = eventWriter.AddInt64ToPayload(int64Ts)
if err != nil {
return nil, err
}
eventWriter.SetStartTimestamp(ts[0])
eventWriter.SetEndTimestamp(ts[len(ts)-1])
writer.SetStartTimeStamp(ts[0])
writer.SetEndTimeStamp(ts[len(ts)-1])
err = writer.Close()
if err != nil {
return nil, err
}
buffer, err = writer.GetBuffer()
if err != nil {
return nil, err
}
blobs = append(blobs, &Blob{
Key: "",
Key: DDL,
Value: buffer,
})
@ -620,7 +627,10 @@ func (dataDefinitionCodec *DataDefinitionCodec) Close() error {
//func (indexCodec *IndexCodec) Deserialize(blobs []*Blob) ([]*Blob, error) {}
type IndexCodec struct {
Base
}
func NewIndexCodec() *IndexCodec {
return &IndexCodec{}
}
func (indexCodec *IndexCodec) Serialize(blobs []*Blob) ([]*Blob, error) {

View File

@ -10,112 +10,104 @@ import (
)
func TestInsertCodec(t *testing.T) {
base := Base{
Version: 1,
CommitID: 1,
TenantID: 1,
Schema: &etcdpb.CollectionMeta{
ID: 1,
CreateTime: 1,
SegmentIDs: []int64{0, 1},
PartitionTags: []string{"partition_0", "partition_1"},
Schema: &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
AutoID: true,
Fields: []*schemapb.FieldSchema{
{
FieldID: 0,
Name: "row_id",
IsPrimaryKey: false,
Description: "row_id",
DataType: schemapb.DataType_INT64,
},
{
FieldID: 1,
Name: "Ts",
IsPrimaryKey: false,
Description: "Ts",
DataType: schemapb.DataType_INT64,
},
{
FieldID: 100,
Name: "field_bool",
IsPrimaryKey: false,
Description: "description_2",
DataType: schemapb.DataType_BOOL,
},
{
FieldID: 101,
Name: "field_int8",
IsPrimaryKey: false,
Description: "description_3",
DataType: schemapb.DataType_INT8,
},
{
FieldID: 102,
Name: "field_int16",
IsPrimaryKey: false,
Description: "description_4",
DataType: schemapb.DataType_INT16,
},
{
FieldID: 103,
Name: "field_int32",
IsPrimaryKey: false,
Description: "description_5",
DataType: schemapb.DataType_INT32,
},
{
FieldID: 104,
Name: "field_int64",
IsPrimaryKey: false,
Description: "description_6",
DataType: schemapb.DataType_INT64,
},
{
FieldID: 105,
Name: "field_float",
IsPrimaryKey: false,
Description: "description_7",
DataType: schemapb.DataType_FLOAT,
},
{
FieldID: 106,
Name: "field_double",
IsPrimaryKey: false,
Description: "description_8",
DataType: schemapb.DataType_DOUBLE,
},
{
FieldID: 107,
Name: "field_string",
IsPrimaryKey: false,
Description: "description_9",
DataType: schemapb.DataType_STRING,
},
{
FieldID: 108,
Name: "field_binary_vector",
IsPrimaryKey: false,
Description: "description_10",
DataType: schemapb.DataType_VECTOR_BINARY,
},
{
FieldID: 109,
Name: "field_float_vector",
IsPrimaryKey: false,
Description: "description_11",
DataType: schemapb.DataType_VECTOR_FLOAT,
},
Schema := &etcdpb.CollectionMeta{
ID: 1,
CreateTime: 1,
SegmentIDs: []int64{0, 1},
PartitionTags: []string{"partition_0", "partition_1"},
Schema: &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
AutoID: true,
Fields: []*schemapb.FieldSchema{
{
FieldID: 0,
Name: "row_id",
IsPrimaryKey: false,
Description: "row_id",
DataType: schemapb.DataType_INT64,
},
{
FieldID: 1,
Name: "Ts",
IsPrimaryKey: false,
Description: "Ts",
DataType: schemapb.DataType_INT64,
},
{
FieldID: 100,
Name: "field_bool",
IsPrimaryKey: false,
Description: "description_2",
DataType: schemapb.DataType_BOOL,
},
{
FieldID: 101,
Name: "field_int8",
IsPrimaryKey: false,
Description: "description_3",
DataType: schemapb.DataType_INT8,
},
{
FieldID: 102,
Name: "field_int16",
IsPrimaryKey: false,
Description: "description_4",
DataType: schemapb.DataType_INT16,
},
{
FieldID: 103,
Name: "field_int32",
IsPrimaryKey: false,
Description: "description_5",
DataType: schemapb.DataType_INT32,
},
{
FieldID: 104,
Name: "field_int64",
IsPrimaryKey: false,
Description: "description_6",
DataType: schemapb.DataType_INT64,
},
{
FieldID: 105,
Name: "field_float",
IsPrimaryKey: false,
Description: "description_7",
DataType: schemapb.DataType_FLOAT,
},
{
FieldID: 106,
Name: "field_double",
IsPrimaryKey: false,
Description: "description_8",
DataType: schemapb.DataType_DOUBLE,
},
{
FieldID: 107,
Name: "field_string",
IsPrimaryKey: false,
Description: "description_9",
DataType: schemapb.DataType_STRING,
},
{
FieldID: 108,
Name: "field_binary_vector",
IsPrimaryKey: false,
Description: "description_10",
DataType: schemapb.DataType_VECTOR_BINARY,
},
{
FieldID: 109,
Name: "field_float_vector",
IsPrimaryKey: false,
Description: "description_11",
DataType: schemapb.DataType_VECTOR_FLOAT,
},
},
},
}
insertCodec := &InsertCodec{
base,
make([]func() error, 0),
}
insertCodec := NewInsertCodec(Schema)
insertDataFirst := &InsertData{
Data: map[int64]FieldData{
0: &Int64FieldData{
@ -268,58 +260,7 @@ func TestInsertCodec(t *testing.T) {
assert.Nil(t, insertCodec.Close())
}
func TestDDCodec(t *testing.T) {
base := Base{
Version: 1,
CommitID: 1,
TenantID: 1,
Schema: &etcdpb.CollectionMeta{
ID: 1,
CreateTime: 1,
SegmentIDs: []int64{0, 1},
PartitionTags: []string{"partition_0", "partition_1"},
Schema: &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
AutoID: true,
Fields: []*schemapb.FieldSchema{
{
Name: "field_1",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_INT32,
},
{
Name: "field_2",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_INT64,
},
{
Name: "field_3",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_STRING,
},
{
Name: "field_3",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_STRING,
},
{
Name: "field_3",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_STRING,
},
},
},
},
}
dataDefinitionCodec := &DataDefinitionCodec{
base,
make([]func() error, 0),
}
dataDefinitionCodec := NewDataDefinitionCodec(int64(1))
ts := []Timestamp{
1,
2,
@ -351,9 +292,7 @@ func TestDDCodec(t *testing.T) {
}
func TestIndexCodec(t *testing.T) {
indexCodec := &IndexCodec{
Base{},
}
indexCodec := NewIndexCodec()
blobs := []*Blob{
{
"12345",