Store original memory size of binlog file to extra information (#9628)

Signed-off-by: dragondriver <jiquan.long@zilliz.com>
pull/9649/head
dragondriver 2021-10-11 17:28:30 +08:00 committed by GitHub
parent 0d49316103
commit 9a7a060484
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 171 additions and 5 deletions

View File

@ -56,6 +56,8 @@ func TestInsertBinlog(t *testing.T) {
w.SetEventTimeStamp(1000, 2000)
w.baseBinlogWriter.descriptorEventData.AddExtra("test", "testExtra")
sizeTotal := 20 // not important
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
_, err = w.GetBuffer()
assert.NotNil(t, err)
@ -146,7 +148,15 @@ func TestInsertBinlog(t *testing.T) {
multiBytes[i] = singleByte
pos++
}
assert.Equal(t, string(multiBytes), "{\"test\":\"testExtra\"}")
var extra map[string]interface{}
err = json.Unmarshal(multiBytes, &extra)
assert.NoError(t, err)
testExtra, ok := extra["test"]
assert.True(t, ok)
assert.Equal(t, "testExtra", fmt.Sprintf("%v", testExtra))
size, ok := extra[originalSizeKey]
assert.True(t, ok)
assert.Equal(t, fmt.Sprintf("%v", sizeTotal), fmt.Sprintf("%v", size))
//start of e1
assert.Equal(t, pos, int(descNxtPos))
@ -293,6 +303,8 @@ func TestDeleteBinlog(t *testing.T) {
w.SetEventTimeStamp(1000, 2000)
w.baseBinlogWriter.descriptorEventData.AddExtra("test", "testExtra")
sizeTotal := 20 // not important
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
_, err = w.GetBuffer()
assert.NotNil(t, err)
@ -383,7 +395,15 @@ func TestDeleteBinlog(t *testing.T) {
multiBytes[i] = singleByte
pos++
}
assert.Equal(t, string(multiBytes), "{\"test\":\"testExtra\"}")
var extra map[string]interface{}
err = json.Unmarshal(multiBytes, &extra)
assert.NoError(t, err)
testExtra, ok := extra["test"]
assert.True(t, ok)
assert.Equal(t, "testExtra", fmt.Sprintf("%v", testExtra))
size, ok := extra[originalSizeKey]
assert.True(t, ok)
assert.Equal(t, fmt.Sprintf("%v", sizeTotal), fmt.Sprintf("%v", size))
//start of e1
assert.Equal(t, pos, int(descNxtPos))
@ -530,6 +550,8 @@ func TestDDLBinlog1(t *testing.T) {
w.SetEventTimeStamp(1000, 2000)
w.baseBinlogWriter.descriptorEventData.AddExtra("test", "testExtra")
sizeTotal := 20 // not important
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
_, err = w.GetBuffer()
assert.NotNil(t, err)
@ -620,7 +642,15 @@ func TestDDLBinlog1(t *testing.T) {
multiBytes[i] = singleByte
pos++
}
assert.Equal(t, string(multiBytes), "{\"test\":\"testExtra\"}")
var extra map[string]interface{}
err = json.Unmarshal(multiBytes, &extra)
assert.NoError(t, err)
testExtra, ok := extra["test"]
assert.True(t, ok)
assert.Equal(t, "testExtra", fmt.Sprintf("%v", testExtra))
size, ok := extra[originalSizeKey]
assert.True(t, ok)
assert.Equal(t, fmt.Sprintf("%v", sizeTotal), fmt.Sprintf("%v", size))
//start of e1
assert.Equal(t, pos, int(descNxtPos))
@ -767,6 +797,8 @@ func TestDDLBinlog2(t *testing.T) {
w.SetEventTimeStamp(1000, 2000)
w.baseBinlogWriter.descriptorEventData.AddExtra("test", "testExtra")
sizeTotal := 20 // not important
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
_, err = w.GetBuffer()
assert.NotNil(t, err)
@ -857,7 +889,15 @@ func TestDDLBinlog2(t *testing.T) {
multiBytes[i] = singleByte
pos++
}
assert.Equal(t, string(multiBytes), "{\"test\":\"testExtra\"}")
var extra map[string]interface{}
err = json.Unmarshal(multiBytes, &extra)
assert.NoError(t, err)
testExtra, ok := extra["test"]
assert.True(t, ok)
assert.Equal(t, "testExtra", fmt.Sprintf("%v", testExtra))
size, ok := extra[originalSizeKey]
assert.True(t, ok)
assert.Equal(t, fmt.Sprintf("%v", sizeTotal), fmt.Sprintf("%v", size))
//start of e1
assert.Equal(t, pos, int(descNxtPos))
@ -1002,6 +1042,9 @@ func TestIndexFileBinlog(t *testing.T) {
w.SetEventTimeStamp(timestamp, timestamp)
sizeTotal := 20 // not important
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
_, err = w.GetBuffer()
assert.NotNil(t, err)
err = w.Close()
@ -1094,6 +1137,7 @@ func TestIndexFileBinlog(t *testing.T) {
assert.Equal(t, fmt.Sprintf("%v", indexName), fmt.Sprintf("%v", j["indexName"]))
assert.Equal(t, fmt.Sprintf("%v", indexID), fmt.Sprintf("%v", j["indexID"]))
assert.Equal(t, fmt.Sprintf("%v", key), fmt.Sprintf("%v", j["key"]))
assert.Equal(t, fmt.Sprintf("%v", sizeTotal), fmt.Sprintf("%v", j[originalSizeKey]))
// NextIndexFileBinlogWriter after close
_, err = w.NextIndexFileEventWriter()
@ -1144,6 +1188,10 @@ func TestNewBinlogReaderError(t *testing.T) {
_, err = w.GetBuffer()
assert.NotNil(t, err)
sizeTotal := 20 // not important
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
err = w.Close()
assert.Nil(t, err)
@ -1170,6 +1218,9 @@ func TestNewBinlogWriterTsError(t *testing.T) {
err = w.Close()
assert.NotNil(t, err)
sizeTotal := 20 // not important
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
w.SetEventTimeStamp(1000, 0)
_, err = w.GetBuffer()
assert.NotNil(t, err)
@ -1190,6 +1241,8 @@ func TestInsertBinlogWriterCloseError(t *testing.T) {
insertWriter := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40)
e1, err := insertWriter.NextInsertEventWriter()
assert.Nil(t, err)
sizeTotal := 20 // not important
insertWriter.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
err = e1.AddDataToPayload([]int64{1, 2, 3})
assert.Nil(t, err)
e1.SetEventTimestamp(100, 200)
@ -1206,6 +1259,8 @@ func TestDeleteBinlogWriteCloseError(t *testing.T) {
deleteWriter := NewDeleteBinlogWriter(schemapb.DataType_Int64, 10, 1, 1)
e1, err := deleteWriter.NextDeleteEventWriter()
assert.Nil(t, err)
sizeTotal := 20 // not important
deleteWriter.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
err = e1.AddDataToPayload([]int64{1, 2, 3})
assert.Nil(t, err)
e1.SetEventTimestamp(100, 200)
@ -1222,6 +1277,10 @@ func TestDDBinlogWriteCloseError(t *testing.T) {
ddBinlogWriter := NewDDLBinlogWriter(schemapb.DataType_Int64, 10)
e1, err := ddBinlogWriter.NextCreateCollectionEventWriter()
assert.Nil(t, err)
sizeTotal := 20 // not important
ddBinlogWriter.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
err = e1.AddDataToPayload([]int64{1, 2, 3})
assert.Nil(t, err)
e1.SetEventTimestamp(100, 200)
@ -1303,6 +1362,8 @@ var _ EventWriter = (*testEvent)(nil)
func TestWriterListError(t *testing.T) {
insertWriter := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40)
sizeTotal := 20 // not important
insertWriter.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
errorEvent := &testEvent{}
insertWriter.eventWriters = append(insertWriter.eventWriters, errorEvent)
insertWriter.SetEventTimeStamp(1000, 2000)

View File

@ -37,6 +37,8 @@ func TestBinlogWriterReader(t *testing.T) {
nums, err := binlogWriter.GetRowNums()
assert.Nil(t, err)
assert.EqualValues(t, 3, nums)
sizeTotal := 20 // not important
binlogWriter.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, sizeTotal)
err = binlogWriter.Close()
assert.Nil(t, err)
assert.EqualValues(t, 1, binlogWriter.GetEventNums())

View File

@ -12,6 +12,7 @@
package storage
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
@ -83,7 +84,8 @@ func (b Blob) GetValue() []byte {
return b.Value
}
type FieldData interface{}
type FieldData interface {
}
type BoolFieldData struct {
NumRows []int64
@ -128,6 +130,51 @@ type FloatVectorFieldData struct {
Dim int
}
// why not binary.Size(data) directly? binary.Size(data) return -1
// binary.Size returns how many bytes Write would generate to encode the value v, which
// must be a fixed-size value or a slice of fixed-size values, or a pointer to such data.
// If v is neither of these, binary.Size returns -1.
func (data *BoolFieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data)
}
func (data *Int8FieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data)
}
func (data *Int16FieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data)
}
func (data *Int32FieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data)
}
func (data *Int64FieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data)
}
func (data *FloatFieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data)
}
func (data *DoubleFieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data)
}
func (data *StringFieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data)
}
func (data *BinaryVectorFieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data) + binary.Size(data.Dim)
}
func (data *FloatVectorFieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data) + binary.Size(data.Dim)
}
// system filed id:
// 0: unique row id
// 1: timestamp
@ -196,18 +243,25 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
switch field.DataType {
case schemapb.DataType_Bool:
err = eventWriter.AddBoolToPayload(singleData.(*BoolFieldData).Data)
writer.AddExtra(originalSizeKey, singleData.(*BoolFieldData).GetMemorySize())
case schemapb.DataType_Int8:
err = eventWriter.AddInt8ToPayload(singleData.(*Int8FieldData).Data)
writer.AddExtra(originalSizeKey, singleData.(*Int8FieldData).GetMemorySize())
case schemapb.DataType_Int16:
err = eventWriter.AddInt16ToPayload(singleData.(*Int16FieldData).Data)
writer.AddExtra(originalSizeKey, singleData.(*Int16FieldData).GetMemorySize())
case schemapb.DataType_Int32:
err = eventWriter.AddInt32ToPayload(singleData.(*Int32FieldData).Data)
writer.AddExtra(originalSizeKey, singleData.(*Int32FieldData).GetMemorySize())
case schemapb.DataType_Int64:
err = eventWriter.AddInt64ToPayload(singleData.(*Int64FieldData).Data)
writer.AddExtra(originalSizeKey, singleData.(*Int64FieldData).GetMemorySize())
case schemapb.DataType_Float:
err = eventWriter.AddFloatToPayload(singleData.(*FloatFieldData).Data)
writer.AddExtra(originalSizeKey, singleData.(*FloatFieldData).GetMemorySize())
case schemapb.DataType_Double:
err = eventWriter.AddDoubleToPayload(singleData.(*DoubleFieldData).Data)
writer.AddExtra(originalSizeKey, singleData.(*DoubleFieldData).GetMemorySize())
case schemapb.DataType_String:
for _, singleString := range singleData.(*StringFieldData).Data {
err = eventWriter.AddOneStringToPayload(singleString)
@ -215,10 +269,13 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
return nil, nil, err
}
}
writer.AddExtra(originalSizeKey, singleData.(*StringFieldData).GetMemorySize())
case schemapb.DataType_BinaryVector:
err = eventWriter.AddBinaryVectorToPayload(singleData.(*BinaryVectorFieldData).Data, singleData.(*BinaryVectorFieldData).Dim)
writer.AddExtra(originalSizeKey, singleData.(*BinaryVectorFieldData).GetMemorySize())
case schemapb.DataType_FloatVector:
err = eventWriter.AddFloatVectorToPayload(singleData.(*FloatVectorFieldData).Data, singleData.(*FloatVectorFieldData).Dim)
writer.AddExtra(originalSizeKey, singleData.(*FloatVectorFieldData).GetMemorySize())
default:
return nil, nil, fmt.Errorf("undefined data type %d", field.DataType)
}
@ -536,6 +593,7 @@ func (deleteCodec *DeleteCodec) Serialize(partitionID UniqueID, segmentID Unique
if err != nil {
return nil, err
}
sizeTotal := 0
startTs, endTs := math.MaxInt64, math.MinInt64
for key, value := range data.Data {
if value < int64(startTs) {
@ -548,9 +606,18 @@ func (deleteCodec *DeleteCodec) Serialize(partitionID UniqueID, segmentID Unique
if err != nil {
return nil, err
}
sizeTotal += len(key)
sizeTotal += binary.Size(value)
}
eventWriter.SetEventTimestamp(uint64(startTs), uint64(endTs))
binlogWriter.SetEventTimeStamp(uint64(startTs), uint64(endTs))
// https://github.com/milvus-io/milvus/issues/9620
// It's a little complicated to count the memory size of a map.
// See: https://stackoverflow.com/questions/31847549/computing-the-memory-footprint-or-byte-length-of-a-map
// Since the implementation of golang map may differ from version, so we'd better not to use this magic method.
binlogWriter.AddExtra(originalSizeKey, sizeTotal)
err = binlogWriter.Close()
if err != nil {
return nil, err
@ -642,7 +709,12 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
}
eventWriter.SetEventTimestamp(ts[0], ts[len(ts)-1])
writer.SetEventTimeStamp(ts[0], ts[len(ts)-1])
// https://github.com/milvus-io/milvus/issues/9620
writer.AddExtra(originalSizeKey, binary.Size(int64Ts))
err = writer.Close()
if err != nil {
return nil, err
}
@ -657,7 +729,9 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
writer = NewDDLBinlogWriter(schemapb.DataType_String, dataDefinitionCodec.collectionID)
sizeTotal := 0
for pos, req := range ddRequests {
sizeTotal += len(req)
switch eventTypes[pos] {
case CreateCollectionEventType:
eventWriter, err := writer.NextCreateCollectionEventWriter()
@ -702,6 +776,10 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
}
}
writer.SetEventTimeStamp(ts[0], ts[len(ts)-1])
// https://github.com/milvus-io/milvus/issues/9620
writer.AddExtra(originalSizeKey, sizeTotal)
err = writer.Close()
if err != nil {
return nil, err
@ -850,6 +928,9 @@ func (codec *IndexFileBinlogCodec) Serialize(
writer.SetEventTimeStamp(ts, ts)
// https://github.com/milvus-io/milvus/issues/9620
writer.AddExtra(originalSizeKey, len(datas[pos].Value))
err = writer.Close()
if err != nil {
return nil, err
@ -892,6 +973,10 @@ func (codec *IndexFileBinlogCodec) Serialize(
writer.SetEventTimeStamp(ts, ts)
// https://github.com/milvus-io/milvus/issues/9620
// len(params) is also not accurate, indexParams is a map
writer.AddExtra(originalSizeKey, len(params))
err = writer.Close()
if err != nil {
return nil, err

View File

@ -15,12 +15,15 @@ import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const originalSizeKey = "original_size"
type descriptorEventData struct {
DescriptorEventDataFixPart
ExtraLength int32
@ -66,6 +69,13 @@ func (data *descriptorEventData) AddExtra(k string, v interface{}) {
// Call before GetMemoryUsageInBytes to get a accurate length of description event.
func (data *descriptorEventData) FinishExtra() error {
var err error
// keep all binlog file records the original size
_, ok := data.Extras[originalSizeKey]
if !ok {
return fmt.Errorf("%v not in extra", originalSizeKey)
}
data.ExtraBytes, err = json.Marshal(data.Extras)
if err != nil {
return err

View File

@ -52,6 +52,12 @@ func TestDescriptorEvent(t *testing.T) {
var buf bytes.Buffer
err := desc.Write(&buf)
assert.NotNil(t, err)
sizeTotal := 20 // not important
desc.AddExtra(originalSizeKey, sizeTotal)
err = desc.Write(&buf)
assert.Nil(t, err)
buffer := buf.Bytes()

View File

@ -59,6 +59,8 @@ func TestPrintBinlogFilesInt64(t *testing.T) {
_, err = w.GetBuffer()
assert.NotNil(t, err)
sizeTotal := 20 // not important
w.AddExtra(originalSizeKey, sizeTotal)
err = w.Close()
assert.Nil(t, err)
buf, err := w.GetBuffer()