Close payload writer when error occurs (#15013)

Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com>
pull/15029/head
godchen 2022-01-07 18:27:23 +08:00 committed by GitHub
parent 7f47ef0244
commit 384ceea223
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 159 additions and 12 deletions

View File

@ -19,6 +19,9 @@ package storage
import (
"bytes"
"errors"
"runtime"
"github.com/milvus-io/milvus/internal/log"
)
// BinlogReader is an object to read binlog file. Binlog file's format can be
@ -88,5 +91,10 @@ func NewBinlogReader(data []byte) (*BinlogReader, error) {
if _, err := reader.readDescriptorEvent(); err != nil {
return nil, err
}
runtime.SetFinalizer(reader, func(reader *BinlogReader) {
if !reader.isClose {
log.Error("binlog reader is leaking.. please check")
}
})
return reader, nil
}

View File

@ -20,8 +20,10 @@ import (
"bytes"
"encoding/binary"
"fmt"
"runtime"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
@ -263,7 +265,8 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID
descriptorEvent.PartitionID = partitionID
descriptorEvent.SegmentID = segmentID
descriptorEvent.FieldID = FieldID
return &InsertBinlogWriter{
w := &InsertBinlogWriter{
baseBinlogWriter: baseBinlogWriter{
descriptorEvent: *descriptorEvent,
magicNumber: MagicNumber,
@ -272,6 +275,13 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID
buffer: nil,
},
}
runtime.SetFinalizer(w, func(writer *InsertBinlogWriter) {
if !w.isClosed() {
log.Error("insert binlog writer is leaking.. please check")
}
})
return w
}
// NewDeleteBinlogWriter creates DeleteBinlogWriter to write binlog file.
@ -281,7 +291,7 @@ func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID, partitionID
descriptorEvent.CollectionID = collectionID
descriptorEvent.PartitionID = partitionID
descriptorEvent.SegmentID = segmentID
return &DeleteBinlogWriter{
w := &DeleteBinlogWriter{
baseBinlogWriter: baseBinlogWriter{
descriptorEvent: *descriptorEvent,
magicNumber: MagicNumber,
@ -290,6 +300,12 @@ func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID, partitionID
buffer: nil,
},
}
runtime.SetFinalizer(w, func(writer *DeleteBinlogWriter) {
if !w.isClosed() {
log.Error("delete binlog writer is leaking.. please check")
}
})
return w
}
// NewDDLBinlogWriter creates DDLBinlogWriter to write binlog file.
@ -297,7 +313,7 @@ func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) *DDLBinl
descriptorEvent := newDescriptorEvent()
descriptorEvent.PayloadDataType = dataType
descriptorEvent.CollectionID = collectionID
return &DDLBinlogWriter{
w := &DDLBinlogWriter{
baseBinlogWriter: baseBinlogWriter{
descriptorEvent: *descriptorEvent,
magicNumber: MagicNumber,
@ -306,6 +322,12 @@ func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) *DDLBinl
buffer: nil,
},
}
runtime.SetFinalizer(w, func(writer *DDLBinlogWriter) {
if !w.isClosed() {
log.Error("ddl binlog writer is leaking.. please check")
}
})
return w
}
// NewIndexFileBinlogWriter returns a new IndexFileBinlogWriter with provided parameters
@ -331,7 +353,7 @@ func NewIndexFileBinlogWriter(
descriptorEvent.AddExtra("indexName", indexName)
descriptorEvent.AddExtra("indexID", fmt.Sprintf("%d", indexID))
descriptorEvent.AddExtra("key", key)
return &IndexFileBinlogWriter{
w := &IndexFileBinlogWriter{
baseBinlogWriter: baseBinlogWriter{
descriptorEvent: *descriptorEvent,
magicNumber: MagicNumber,
@ -340,4 +362,10 @@ func NewIndexFileBinlogWriter(
buffer: nil,
},
}
runtime.SetFinalizer(w, func(writer *IndexFileBinlogWriter) {
if !w.isClosed() {
log.Error("index file binlog writer is leaking.. please check")
}
})
return w
}

View File

@ -298,6 +298,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
writer = NewInsertBinlogWriter(field.DataType, insertCodec.Schema.ID, partitionID, segmentID, field.FieldID)
eventWriter, err := writer.NextInsertEventWriter()
if err != nil {
writer.Close()
return nil, nil, err
}
@ -305,38 +306,85 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
switch field.DataType {
case schemapb.DataType_Bool:
err = eventWriter.AddBoolToPayload(singleData.(*BoolFieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*BoolFieldData).GetMemorySize()))
case schemapb.DataType_Int8:
err = eventWriter.AddInt8ToPayload(singleData.(*Int8FieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int8FieldData).GetMemorySize()))
case schemapb.DataType_Int16:
err = eventWriter.AddInt16ToPayload(singleData.(*Int16FieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int16FieldData).GetMemorySize()))
case schemapb.DataType_Int32:
err = eventWriter.AddInt32ToPayload(singleData.(*Int32FieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int32FieldData).GetMemorySize()))
case schemapb.DataType_Int64:
err = eventWriter.AddInt64ToPayload(singleData.(*Int64FieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int64FieldData).GetMemorySize()))
case schemapb.DataType_Float:
err = eventWriter.AddFloatToPayload(singleData.(*FloatFieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*FloatFieldData).GetMemorySize()))
case schemapb.DataType_Double:
err = eventWriter.AddDoubleToPayload(singleData.(*DoubleFieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*DoubleFieldData).GetMemorySize()))
case schemapb.DataType_String:
for _, singleString := range singleData.(*StringFieldData).Data {
err = eventWriter.AddOneStringToPayload(singleString)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
}
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*StringFieldData).GetMemorySize()))
case schemapb.DataType_BinaryVector:
err = eventWriter.AddBinaryVectorToPayload(singleData.(*BinaryVectorFieldData).Data, singleData.(*BinaryVectorFieldData).Dim)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*BinaryVectorFieldData).GetMemorySize()))
case schemapb.DataType_FloatVector:
err = eventWriter.AddFloatVectorToPayload(singleData.(*FloatVectorFieldData).Data, singleData.(*FloatVectorFieldData).Dim)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*FloatVectorFieldData).GetMemorySize()))
default:
return nil, nil, fmt.Errorf("undefined data type %d", field.DataType)
@ -348,11 +396,15 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
err = writer.Finish()
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
}
buffer, err := writer.GetBuffer()
if err != nil {
eventWriter.Close()
writer.Close()
return nil, nil, err
}
blobKey := fmt.Sprintf("%d", field.FieldID)
@ -693,6 +745,12 @@ func NewDeleteCodec() *DeleteCodec {
func (deleteCodec *DeleteCodec) Serialize(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, data *DeleteData) (*Blob, error) {
binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, collectionID, partitionID, segmentID)
eventWriter, err := binlogWriter.NextDeleteEventWriter()
if err != nil {
binlogWriter.Close()
return nil, err
}
defer binlogWriter.Close()
defer eventWriter.Close()
if err != nil {
return nil, err
}
@ -739,8 +797,6 @@ func (deleteCodec *DeleteCodec) Serialize(collectionID UniqueID, partitionID Uni
blob := &Blob{
Value: buffer,
}
eventWriter.Close()
binlogWriter.Close()
return blob, nil
}
@ -834,10 +890,16 @@ func NewDataDefinitionCodec(collectionID int64) *DataDefinitionCodec {
// It returns blobs in the end.
func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) {
writer := NewDDLBinlogWriter(schemapb.DataType_Int64, dataDefinitionCodec.collectionID)
eventWriter, err := writer.NextCreateCollectionEventWriter()
if err != nil {
writer.Close()
return nil, err
}
defer writer.Close()
defer eventWriter.Close()
var blobs []*Blob
eventWriter, err := writer.NextCreateCollectionEventWriter()
if err != nil {
return nil, err
}
@ -936,8 +998,6 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
Key: DDL,
Value: buffer,
})
eventWriter.Close()
writer.Close()
return blobs, nil
}
@ -1048,11 +1108,14 @@ func (codec *IndexFileBinlogCodec) Serialize(
eventWriter, err := writer.NextIndexFileEventWriter()
if err != nil {
writer.Close()
return nil, err
}
err = eventWriter.AddByteToPayload(datas[pos].Value)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
@ -1065,10 +1128,14 @@ func (codec *IndexFileBinlogCodec) Serialize(
err = writer.Finish()
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
buffer, err := writer.GetBuffer()
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
@ -1083,11 +1150,13 @@ func (codec *IndexFileBinlogCodec) Serialize(
// save index params
writer := NewIndexFileBinlogWriter(indexBuildID, version, collectionID, partitionID, segmentID, fieldID, indexName, indexID, IndexParamsKey)
eventWriter, err := writer.NextIndexFileEventWriter()
if err != nil {
writer.Close()
return nil, err
}
defer writer.Close()
defer eventWriter.Close()
params, _ := json.Marshal(indexParams)
err = eventWriter.AddByteToPayload(params)
@ -1117,8 +1186,6 @@ func (codec *IndexFileBinlogCodec) Serialize(
//Key: strconv.Itoa(len(datas)),
Value: buffer,
})
eventWriter.Close()
writer.Close()
return blobs, nil
}

View File

@ -19,7 +19,9 @@ package storage
import (
"bytes"
"fmt"
"runtime"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
@ -108,5 +110,10 @@ func newEventReader(datatype schemapb.DataType, buffer *bytes.Buffer) (*EventRea
return nil, err
}
reader.PayloadReaderInterface = payloadReader
runtime.SetFinalizer(reader, func(reader *EventReader) {
if !reader.isClosed {
log.Error("event reader is leaking.. please check")
}
})
return reader, nil
}

View File

@ -22,8 +22,10 @@ import (
"errors"
"fmt"
"io"
"runtime"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
@ -257,6 +259,11 @@ func newInsertEventWriter(dataType schemapb.DataType) (*insertEventWriter, error
}
writer.baseEventWriter.getEventDataSize = writer.insertEventData.GetEventDataFixPartSize
writer.baseEventWriter.writeEventData = writer.insertEventData.WriteEventData
runtime.SetFinalizer(writer, func(writer *insertEventWriter) {
if !writer.isClosed {
log.Error("insert event binlog writer is leaking.. please check")
}
})
return writer, nil
}
@ -279,6 +286,11 @@ func newDeleteEventWriter(dataType schemapb.DataType) (*deleteEventWriter, error
}
writer.baseEventWriter.getEventDataSize = writer.deleteEventData.GetEventDataFixPartSize
writer.baseEventWriter.writeEventData = writer.deleteEventData.WriteEventData
runtime.SetFinalizer(writer, func(writer *deleteEventWriter) {
if !writer.isClosed {
log.Error("delete event binlog writer is leaking.. please check")
}
})
return writer, nil
}
@ -305,6 +317,11 @@ func newCreateCollectionEventWriter(dataType schemapb.DataType) (*createCollecti
}
writer.baseEventWriter.getEventDataSize = writer.createCollectionEventData.GetEventDataFixPartSize
writer.baseEventWriter.writeEventData = writer.createCollectionEventData.WriteEventData
runtime.SetFinalizer(writer, func(writer *createCollectionEventWriter) {
if !writer.isClosed {
log.Error("create collection event binlog writer is leaking.. please check")
}
})
return writer, nil
}
@ -331,6 +348,11 @@ func newDropCollectionEventWriter(dataType schemapb.DataType) (*dropCollectionEv
}
writer.baseEventWriter.getEventDataSize = writer.dropCollectionEventData.GetEventDataFixPartSize
writer.baseEventWriter.writeEventData = writer.dropCollectionEventData.WriteEventData
runtime.SetFinalizer(writer, func(writer *dropCollectionEventWriter) {
if !writer.isClosed {
log.Error("drop collection event binlog writer is leaking.. please check")
}
})
return writer, nil
}
@ -357,6 +379,11 @@ func newCreatePartitionEventWriter(dataType schemapb.DataType) (*createPartition
}
writer.baseEventWriter.getEventDataSize = writer.createPartitionEventData.GetEventDataFixPartSize
writer.baseEventWriter.writeEventData = writer.createPartitionEventData.WriteEventData
runtime.SetFinalizer(writer, func(writer *createPartitionEventWriter) {
if !writer.isClosed {
log.Error("create partition binlog writer is leaking.. please check")
}
})
return writer, nil
}
@ -383,6 +410,11 @@ func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEven
}
writer.baseEventWriter.getEventDataSize = writer.dropPartitionEventData.GetEventDataFixPartSize
writer.baseEventWriter.writeEventData = writer.dropPartitionEventData.WriteEventData
runtime.SetFinalizer(writer, func(writer *dropPartitionEventWriter) {
if !writer.isClosed {
log.Error("drop partition event binlog writer is leaking.. please check")
}
})
return writer, nil
}
@ -405,6 +437,11 @@ func newIndexFileEventWriter() (*indexFileEventWriter, error) {
}
writer.baseEventWriter.getEventDataSize = writer.indexFileEventData.GetEventDataFixPartSize
writer.baseEventWriter.writeEventData = writer.indexFileEventData.WriteEventData
runtime.SetFinalizer(writer, func(writer *indexFileEventWriter) {
if !writer.isClosed {
log.Error("index file event binlog writer is leaking.. please check")
}
})
return writer, nil
}