Fix the way that binlog store the original size (#9681)

Signed-off-by: dragondriver <jiquan.long@zilliz.com>
pull/9703/head
dragondriver 2021-10-11 21:02:37 +08:00 committed by GitHub
parent c620152a71
commit abb8c2b9ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 67 additions and 42 deletions

View File

@ -56,8 +56,8 @@ func TestInsertBinlog(t *testing.T) {
w.SetEventTimeStamp(1000, 2000)
w.baseBinlogWriter.descriptorEventData.AddExtra("test", "testExtra")
sizeTotal := 20 // not important
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
sizeTotal := 2000000
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
_, err = w.GetBuffer()
assert.NotNil(t, err)
@ -303,8 +303,8 @@ func TestDeleteBinlog(t *testing.T) {
w.SetEventTimeStamp(1000, 2000)
w.baseBinlogWriter.descriptorEventData.AddExtra("test", "testExtra")
sizeTotal := 20 // not important
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
sizeTotal := 2000000
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
_, err = w.GetBuffer()
assert.NotNil(t, err)
@ -550,8 +550,8 @@ func TestDDLBinlog1(t *testing.T) {
w.SetEventTimeStamp(1000, 2000)
w.baseBinlogWriter.descriptorEventData.AddExtra("test", "testExtra")
sizeTotal := 20 // not important
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
sizeTotal := 2000000
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
_, err = w.GetBuffer()
assert.NotNil(t, err)
@ -797,8 +797,8 @@ func TestDDLBinlog2(t *testing.T) {
w.SetEventTimeStamp(1000, 2000)
w.baseBinlogWriter.descriptorEventData.AddExtra("test", "testExtra")
sizeTotal := 20 // not important
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
sizeTotal := 2000000
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
_, err = w.GetBuffer()
assert.NotNil(t, err)
@ -1042,8 +1042,8 @@ func TestIndexFileBinlog(t *testing.T) {
w.SetEventTimeStamp(timestamp, timestamp)
sizeTotal := 20 // not important
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
sizeTotal := 2000000
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
_, err = w.GetBuffer()
assert.NotNil(t, err)
@ -1189,8 +1189,8 @@ func TestNewBinlogReaderError(t *testing.T) {
_, err = w.GetBuffer()
assert.NotNil(t, err)
sizeTotal := 20 // not important
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
sizeTotal := 2000000
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
err = w.Close()
assert.Nil(t, err)
@ -1218,8 +1218,8 @@ func TestNewBinlogWriterTsError(t *testing.T) {
err = w.Close()
assert.NotNil(t, err)
sizeTotal := 20 // not important
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
sizeTotal := 2000000
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
w.SetEventTimeStamp(1000, 0)
_, err = w.GetBuffer()
@ -1241,8 +1241,10 @@ func TestInsertBinlogWriterCloseError(t *testing.T) {
insertWriter := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40)
e1, err := insertWriter.NextInsertEventWriter()
assert.Nil(t, err)
sizeTotal := 20 // not important
insertWriter.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
sizeTotal := 2000000
insertWriter.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
err = e1.AddDataToPayload([]int64{1, 2, 3})
assert.Nil(t, err)
e1.SetEventTimestamp(100, 200)
@ -1259,8 +1261,8 @@ func TestDeleteBinlogWriteCloseError(t *testing.T) {
deleteWriter := NewDeleteBinlogWriter(schemapb.DataType_Int64, 10, 1, 1)
e1, err := deleteWriter.NextDeleteEventWriter()
assert.Nil(t, err)
sizeTotal := 20 // not important
deleteWriter.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
sizeTotal := 2000000
deleteWriter.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
err = e1.AddDataToPayload([]int64{1, 2, 3})
assert.Nil(t, err)
e1.SetEventTimestamp(100, 200)
@ -1278,8 +1280,8 @@ func TestDDBinlogWriteCloseError(t *testing.T) {
e1, err := ddBinlogWriter.NextCreateCollectionEventWriter()
assert.Nil(t, err)
sizeTotal := 20 // not important
ddBinlogWriter.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
sizeTotal := 2000000
ddBinlogWriter.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
err = e1.AddDataToPayload([]int64{1, 2, 3})
assert.Nil(t, err)
@ -1362,8 +1364,8 @@ var _ EventWriter = (*testEvent)(nil)
func TestWriterListError(t *testing.T) {
insertWriter := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40)
sizeTotal := 20 // not important
insertWriter.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
sizeTotal := 2000000
insertWriter.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
errorEvent := &testEvent{}
insertWriter.eventWriters = append(insertWriter.eventWriters, errorEvent)
insertWriter.SetEventTimeStamp(1000, 2000)

View File

@ -37,8 +37,8 @@ func TestBinlogWriterReader(t *testing.T) {
nums, err := binlogWriter.GetRowNums()
assert.Nil(t, err)
assert.EqualValues(t, 3, nums)
sizeTotal := 20 // not important
binlogWriter.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
sizeTotal := 20000000
binlogWriter.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
err = binlogWriter.Close()
assert.Nil(t, err)
assert.EqualValues(t, 1, binlogWriter.GetEventNums())

View File

@ -243,25 +243,25 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
switch field.DataType {
case schemapb.DataType_Bool:
err = eventWriter.AddBoolToPayload(singleData.(*BoolFieldData).Data)
writer.AddExtra(originalSizeKey, singleData.(*BoolFieldData).GetMemorySize())
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*BoolFieldData).GetMemorySize()))
case schemapb.DataType_Int8:
err = eventWriter.AddInt8ToPayload(singleData.(*Int8FieldData).Data)
writer.AddExtra(originalSizeKey, singleData.(*Int8FieldData).GetMemorySize())
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int8FieldData).GetMemorySize()))
case schemapb.DataType_Int16:
err = eventWriter.AddInt16ToPayload(singleData.(*Int16FieldData).Data)
writer.AddExtra(originalSizeKey, singleData.(*Int16FieldData).GetMemorySize())
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int16FieldData).GetMemorySize()))
case schemapb.DataType_Int32:
err = eventWriter.AddInt32ToPayload(singleData.(*Int32FieldData).Data)
writer.AddExtra(originalSizeKey, singleData.(*Int32FieldData).GetMemorySize())
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int32FieldData).GetMemorySize()))
case schemapb.DataType_Int64:
err = eventWriter.AddInt64ToPayload(singleData.(*Int64FieldData).Data)
writer.AddExtra(originalSizeKey, singleData.(*Int64FieldData).GetMemorySize())
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int64FieldData).GetMemorySize()))
case schemapb.DataType_Float:
err = eventWriter.AddFloatToPayload(singleData.(*FloatFieldData).Data)
writer.AddExtra(originalSizeKey, singleData.(*FloatFieldData).GetMemorySize())
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*FloatFieldData).GetMemorySize()))
case schemapb.DataType_Double:
err = eventWriter.AddDoubleToPayload(singleData.(*DoubleFieldData).Data)
writer.AddExtra(originalSizeKey, singleData.(*DoubleFieldData).GetMemorySize())
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*DoubleFieldData).GetMemorySize()))
case schemapb.DataType_String:
for _, singleString := range singleData.(*StringFieldData).Data {
err = eventWriter.AddOneStringToPayload(singleString)
@ -269,13 +269,13 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
return nil, nil, err
}
}
writer.AddExtra(originalSizeKey, singleData.(*StringFieldData).GetMemorySize())
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*StringFieldData).GetMemorySize()))
case schemapb.DataType_BinaryVector:
err = eventWriter.AddBinaryVectorToPayload(singleData.(*BinaryVectorFieldData).Data, singleData.(*BinaryVectorFieldData).Dim)
writer.AddExtra(originalSizeKey, singleData.(*BinaryVectorFieldData).GetMemorySize())
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*BinaryVectorFieldData).GetMemorySize()))
case schemapb.DataType_FloatVector:
err = eventWriter.AddFloatVectorToPayload(singleData.(*FloatVectorFieldData).Data, singleData.(*FloatVectorFieldData).Dim)
writer.AddExtra(originalSizeKey, singleData.(*FloatVectorFieldData).GetMemorySize())
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*FloatVectorFieldData).GetMemorySize()))
default:
return nil, nil, fmt.Errorf("undefined data type %d", field.DataType)
}
@ -616,7 +616,7 @@ func (deleteCodec *DeleteCodec) Serialize(partitionID UniqueID, segmentID Unique
// It's a little complicated to count the memory size of a map.
// See: https://stackoverflow.com/questions/31847549/computing-the-memory-footprint-or-byte-length-of-a-map
// Since the implementation of golang map may differ from version, so we'd better not to use this magic method.
binlogWriter.AddExtra(originalSizeKey, sizeTotal)
binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
err = binlogWriter.Close()
if err != nil {
@ -711,7 +711,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
writer.SetEventTimeStamp(ts[0], ts[len(ts)-1])
// https://github.com/milvus-io/milvus/issues/9620
writer.AddExtra(originalSizeKey, binary.Size(int64Ts))
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", binary.Size(int64Ts)))
err = writer.Close()
@ -778,7 +778,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
writer.SetEventTimeStamp(ts[0], ts[len(ts)-1])
// https://github.com/milvus-io/milvus/issues/9620
writer.AddExtra(originalSizeKey, sizeTotal)
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
err = writer.Close()
if err != nil {
@ -929,7 +929,7 @@ func (codec *IndexFileBinlogCodec) Serialize(
writer.SetEventTimeStamp(ts, ts)
// https://github.com/milvus-io/milvus/issues/9620
writer.AddExtra(originalSizeKey, len(datas[pos].Value))
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", len(datas[pos].Value)))
err = writer.Close()
if err != nil {
@ -975,7 +975,7 @@ func (codec *IndexFileBinlogCodec) Serialize(
// https://github.com/milvus-io/milvus/issues/9620
// len(params) is also not accurate, indexParams is a map
writer.AddExtra(originalSizeKey, len(params))
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", len(params)))
err = writer.Close()
if err != nil {

View File

@ -17,6 +17,7 @@ import (
"errors"
"fmt"
"io"
"strconv"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/typeutil"
@ -71,10 +72,20 @@ func (data *descriptorEventData) FinishExtra() error {
var err error
// keep all binlog file records the original size
_, ok := data.Extras[originalSizeKey]
sizeStored, ok := data.Extras[originalSizeKey]
if !ok {
return fmt.Errorf("%v not in extra", originalSizeKey)
}
// if we store a large int directly, golang will use scientific notation, we then will get a float value.
// so it's better to store the original size in string format.
sizeStr, ok := sizeStored.(string)
if !ok {
return fmt.Errorf("value of %v must in string format", originalSizeKey)
}
_, err = strconv.Atoi(sizeStr)
if err != nil {
return fmt.Errorf("value of %v must be able to be converted into int format", originalSizeKey)
}
data.ExtraBytes, err = json.Marshal(data.Extras)
if err != nil {

View File

@ -14,6 +14,7 @@ package storage
import (
"bytes"
"encoding/binary"
"fmt"
"testing"
"time"
"unsafe"
@ -57,6 +58,17 @@ func TestDescriptorEvent(t *testing.T) {
sizeTotal := 20 // not important
desc.AddExtra(originalSizeKey, sizeTotal)
// original size not in string format
err = desc.Write(&buf)
assert.NotNil(t, err)
desc.AddExtra(originalSizeKey, "not in int format")
err = desc.Write(&buf)
assert.NotNil(t, err)
desc.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
err = desc.Write(&buf)
assert.Nil(t, err)

View File

@ -59,8 +59,8 @@ func TestPrintBinlogFilesInt64(t *testing.T) {
_, err = w.GetBuffer()
assert.NotNil(t, err)
sizeTotal := 20 // not important
w.AddExtra(originalSizeKey, sizeTotal)
sizeTotal := 20000000
w.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
err = w.Close()
assert.Nil(t, err)
buf, err := w.GetBuffer()