Change binlog writer close behavior (#13046)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/13047/head
godchen 2021-12-09 12:37:06 +08:00 committed by GitHub
parent 8f87cc7ef1
commit febdda90f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 96 additions and 96 deletions

View File

@ -61,11 +61,13 @@ func TestInsertBinlog(t *testing.T) {
_, err = w.GetBuffer()
assert.NotNil(t, err)
err = w.Close()
err = w.Finish()
assert.Nil(t, err)
buf, err := w.GetBuffer()
assert.Nil(t, err)
w.Close()
// magic number
magicNum := UnsafeReadInt32(buf, 0)
assert.Equal(t, magicNum, MagicNumber)
@ -306,11 +308,13 @@ func TestDeleteBinlog(t *testing.T) {
_, err = w.GetBuffer()
assert.NotNil(t, err)
err = w.Close()
err = w.Finish()
assert.Nil(t, err)
buf, err := w.GetBuffer()
assert.Nil(t, err)
w.Close()
//magic number
magicNum := UnsafeReadInt32(buf, 0)
assert.Equal(t, magicNum, MagicNumber)
@ -551,11 +555,13 @@ func TestDDLBinlog1(t *testing.T) {
_, err = w.GetBuffer()
assert.NotNil(t, err)
err = w.Close()
err = w.Finish()
assert.Nil(t, err)
buf, err := w.GetBuffer()
assert.Nil(t, err)
w.Close()
//magic number
magicNum := UnsafeReadInt32(buf, 0)
assert.Equal(t, magicNum, MagicNumber)
@ -796,10 +802,11 @@ func TestDDLBinlog2(t *testing.T) {
_, err = w.GetBuffer()
assert.NotNil(t, err)
err = w.Close()
err = w.Finish()
assert.Nil(t, err)
buf, err := w.GetBuffer()
assert.Nil(t, err)
w.Close()
//magic number
magicNum := UnsafeReadInt32(buf, 0)
@ -1039,11 +1046,13 @@ func TestIndexFileBinlog(t *testing.T) {
_, err = w.GetBuffer()
assert.NotNil(t, err)
err = w.Close()
err = w.Finish()
assert.Nil(t, err)
buf, err := w.GetBuffer()
assert.Nil(t, err)
w.Close()
//magic number
magicNum := UnsafeReadInt32(buf, 0)
assert.Equal(t, magicNum, MagicNumber)
@ -1184,11 +1193,12 @@ func TestNewBinlogReaderError(t *testing.T) {
sizeTotal := 2000000
w.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
err = w.Close()
err = w.Finish()
assert.Nil(t, err)
buf, err := w.GetBuffer()
assert.Nil(t, err)
w.Close()
reader, err = NewBinlogReader(buf)
assert.Nil(t, err)
@ -1206,7 +1216,7 @@ func TestNewBinlogWriterTsError(t *testing.T) {
_, err := w.GetBuffer()
assert.NotNil(t, err)
err = w.Close()
err = w.Finish()
assert.NotNil(t, err)
sizeTotal := 2000000
@ -1215,17 +1225,18 @@ func TestNewBinlogWriterTsError(t *testing.T) {
w.SetEventTimeStamp(1000, 0)
_, err = w.GetBuffer()
assert.NotNil(t, err)
err = w.Close()
err = w.Finish()
assert.NotNil(t, err)
w.SetEventTimeStamp(1000, 2000)
_, err = w.GetBuffer()
assert.NotNil(t, err)
err = w.Close()
err = w.Finish()
assert.Nil(t, err)
_, err = w.GetBuffer()
assert.Nil(t, err)
w.Close()
}
func TestInsertBinlogWriterCloseError(t *testing.T) {
@ -1240,12 +1251,13 @@ func TestInsertBinlogWriterCloseError(t *testing.T) {
assert.Nil(t, err)
e1.SetEventTimestamp(100, 200)
insertWriter.SetEventTimeStamp(1000, 2000)
err = insertWriter.Close()
err = insertWriter.Finish()
assert.Nil(t, err)
assert.NotNil(t, insertWriter.buffer)
insertEventWriter, err := insertWriter.NextInsertEventWriter()
assert.Nil(t, insertEventWriter)
assert.NotNil(t, err)
insertWriter.Close()
}
func TestDeleteBinlogWriteCloseError(t *testing.T) {
@ -1258,12 +1270,13 @@ func TestDeleteBinlogWriteCloseError(t *testing.T) {
assert.Nil(t, err)
e1.SetEventTimestamp(100, 200)
deleteWriter.SetEventTimeStamp(1000, 2000)
err = deleteWriter.Close()
err = deleteWriter.Finish()
assert.Nil(t, err)
assert.NotNil(t, deleteWriter.buffer)
deleteEventWriter, err := deleteWriter.NextDeleteEventWriter()
assert.Nil(t, deleteEventWriter)
assert.NotNil(t, err)
deleteWriter.Close()
}
func TestDDBinlogWriteCloseError(t *testing.T) {
@ -1279,7 +1292,7 @@ func TestDDBinlogWriteCloseError(t *testing.T) {
e1.SetEventTimestamp(100, 200)
ddBinlogWriter.SetEventTimeStamp(1000, 2000)
err = ddBinlogWriter.Close()
err = ddBinlogWriter.Finish()
assert.Nil(t, err)
assert.NotNil(t, ddBinlogWriter.buffer)
@ -1298,6 +1311,8 @@ func TestDDBinlogWriteCloseError(t *testing.T) {
dropPartitionEventWriter, err := ddBinlogWriter.NextDropPartitionEventWriter()
assert.Nil(t, dropPartitionEventWriter)
assert.NotNil(t, err)
ddBinlogWriter.Close()
}
type testEvent struct {
@ -1306,7 +1321,6 @@ type testEvent struct {
writeError bool
getMemoryError bool
getPayloadLengthError bool
releasePayloadError bool
}
func (e *testEvent) Finish() error {
@ -1316,8 +1330,7 @@ func (e *testEvent) Finish() error {
return nil
}
func (e *testEvent) Close() error {
return nil
func (e *testEvent) Close() {
}
func (e *testEvent) Write(buffer *bytes.Buffer) error {
@ -1340,11 +1353,7 @@ func (e *testEvent) GetPayloadLengthFromWriter() (int, error) {
return 0, nil
}
func (e *testEvent) ReleasePayloadWriter() error {
if e.releasePayloadError {
return fmt.Errorf("releasePayload error")
}
return nil
func (e *testEvent) ReleasePayloadWriter() {
}
func (e *testEvent) SetOffset(offset int32) {
@ -1360,23 +1369,20 @@ func TestWriterListError(t *testing.T) {
errorEvent := &testEvent{}
insertWriter.eventWriters = append(insertWriter.eventWriters, errorEvent)
insertWriter.SetEventTimeStamp(1000, 2000)
errorEvent.releasePayloadError = true
err := insertWriter.Close()
assert.NotNil(t, err)
insertWriter.buffer = nil
errorEvent.getPayloadLengthError = true
err = insertWriter.Close()
err := insertWriter.Finish()
assert.NotNil(t, err)
insertWriter.buffer = nil
errorEvent.getMemoryError = true
err = insertWriter.Close()
err = insertWriter.Finish()
assert.NotNil(t, err)
insertWriter.buffer = nil
errorEvent.writeError = true
err = insertWriter.Close()
err = insertWriter.Finish()
assert.NotNil(t, err)
insertWriter.buffer = nil
errorEvent.finishError = true
err = insertWriter.Close()
err = insertWriter.Finish()
assert.NotNil(t, err)
}

View File

@ -87,7 +87,7 @@ func (writer *baseBinlogWriter) GetBuffer() ([]byte, error) {
}
// Close allocate buffer and release resource
func (writer *baseBinlogWriter) Close() error {
func (writer *baseBinlogWriter) Finish() error {
if writer.buffer != nil {
return nil
}
@ -125,13 +125,16 @@ func (writer *baseBinlogWriter) Close() error {
return err
}
writer.length += int32(rows)
if err := w.ReleasePayloadWriter(); err != nil {
return err
}
}
return nil
}
func (writer *baseBinlogWriter) Close() {
for _, e := range writer.eventWriters {
e.Close()
}
}
// InsertBinlogWriter is an object to write binlog file which saves insert data.
type InsertBinlogWriter struct {
baseBinlogWriter

View File

@ -40,7 +40,7 @@ func TestBinlogWriterReader(t *testing.T) {
assert.EqualValues(t, 3, nums)
sizeTotal := 20000000
binlogWriter.baseBinlogWriter.descriptorEventData.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
err = binlogWriter.Close()
err = binlogWriter.Finish()
assert.Nil(t, err)
assert.EqualValues(t, 1, binlogWriter.GetEventNums())
nums, err = binlogWriter.GetRowNums()
@ -55,6 +55,7 @@ func TestBinlogWriterReader(t *testing.T) {
buffer, err := binlogWriter.GetBuffer()
assert.Nil(t, err)
fmt.Println("reader offset : " + strconv.Itoa(len(buffer)))
binlogWriter.Close()
binlogReader, err := NewBinlogReader(buffer)
assert.Nil(t, err)

View File

@ -339,13 +339,10 @@ int GetPayloadLengthFromWriter(CPayloadWriter payloadWriter) {
}
extern "C"
CStatus ReleasePayloadWriter(CPayloadWriter handler) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
void ReleasePayloadWriter(CPayloadWriter handler) {
auto p = reinterpret_cast<wrapper::PayloadWriter *>(handler);
if (p != nullptr) delete p;
return st;
arrow::default_memory_pool()->ReleaseUnused();
}
extern "C"
@ -529,7 +526,9 @@ int GetPayloadLengthFromReader(CPayloadReader payloadReader) {
extern "C"
void ReleasePayloadReader(CPayloadReader payloadReader) {
auto p = reinterpret_cast<wrapper::PayloadReader *>(payloadReader);
delete[] p->bValues;
delete p;
if (p != nullptr) {
delete[] p->bValues;
delete p;
}
arrow::default_memory_pool()->ReleaseUnused();
}

View File

@ -45,7 +45,7 @@ CStatus AddFloatVectorToPayload(CPayloadWriter payloadWriter, float *values, int
CStatus FinishPayloadWriter(CPayloadWriter payloadWriter);
CBuffer GetPayloadBufferFromWriter(CPayloadWriter payloadWriter);
int GetPayloadLengthFromWriter(CPayloadWriter payloadWriter);
CStatus ReleasePayloadWriter(CPayloadWriter handler);
void ReleasePayloadWriter(CPayloadWriter handler);
//============= payload reader ======================
typedef void *CPayloadReader;

View File

@ -335,7 +335,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
}
writer.SetEventTimeStamp(typeutil.Timestamp(startTs), typeutil.Timestamp(endTs))
err = writer.Close()
err = writer.Finish()
if err != nil {
return nil, nil, err
}
@ -349,6 +349,8 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
Key: blobKey,
Value: buffer,
})
eventWriter.Close()
writer.Close()
// stats fields
switch field.DataType {
@ -715,7 +717,7 @@ func (deleteCodec *DeleteCodec) Serialize(collectionID UniqueID, partitionID Uni
// Since the implementation of golang map may differ from version, so we'd better not to use this magic method.
binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
err = binlogWriter.Close()
err = binlogWriter.Finish()
if err != nil {
return nil, err
}
@ -726,6 +728,8 @@ func (deleteCodec *DeleteCodec) Serialize(collectionID UniqueID, partitionID Uni
blob := &Blob{
Value: buffer,
}
eventWriter.Close()
binlogWriter.Close()
return blob, nil
}
@ -840,7 +844,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
// https://github.com/milvus-io/milvus/issues/9620
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", binary.Size(int64Ts)))
err = writer.Close()
err = writer.Finish()
if err != nil {
return nil, err
@ -853,6 +857,8 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
Key: Ts,
Value: buffer,
})
eventWriter.Close()
writer.Close()
writer = NewDDLBinlogWriter(schemapb.DataType_String, dataDefinitionCodec.collectionID)
@ -907,7 +913,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
// https://github.com/milvus-io/milvus/issues/9620
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
err = writer.Close()
err = writer.Finish()
if err != nil {
return nil, err
}
@ -919,6 +925,8 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
Key: DDL,
Value: buffer,
})
eventWriter.Close()
writer.Close()
return blobs, nil
}
@ -1050,7 +1058,7 @@ func (codec *IndexFileBinlogCodec) Serialize(
// https://github.com/milvus-io/milvus/issues/9620
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", len(datas[pos].Value)))
err = writer.Close()
err = writer.Finish()
if err != nil {
return nil, err
}
@ -1064,6 +1072,8 @@ func (codec *IndexFileBinlogCodec) Serialize(
//Key: strconv.Itoa(pos),
Value: buffer,
})
eventWriter.Close()
writer.Close()
}
// save index params
@ -1096,7 +1106,7 @@ func (codec *IndexFileBinlogCodec) Serialize(
// len(params) is also not accurate, indexParams is a map
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", len(params)))
err = writer.Close()
err = writer.Finish()
if err != nil {
return nil, err
}
@ -1110,6 +1120,8 @@ func (codec *IndexFileBinlogCodec) Serialize(
//Key: strconv.Itoa(len(datas)),
Value: buffer,
})
eventWriter.Close()
writer.Close()
return blobs, nil
}

View File

@ -166,8 +166,7 @@ func TestInsertEvent(t *testing.T) {
var buf bytes.Buffer
err = w.Write(&buf)
assert.Nil(t, err)
err = w.Close()
assert.Nil(t, err)
w.Close()
wBuf := buf.Bytes()
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
@ -337,8 +336,7 @@ func TestInsertEvent(t *testing.T) {
var buf bytes.Buffer
err = w.Write(&buf)
assert.Nil(t, err)
err = w.Close()
assert.Nil(t, err)
w.Close()
wBuf := buf.Bytes()
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
@ -408,8 +406,7 @@ func TestDeleteEvent(t *testing.T) {
var buf bytes.Buffer
err = w.Write(&buf)
assert.Nil(t, err)
err = w.Close()
assert.Nil(t, err)
w.Close()
wBuf := buf.Bytes()
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
@ -579,8 +576,7 @@ func TestDeleteEvent(t *testing.T) {
var buf bytes.Buffer
err = w.Write(&buf)
assert.Nil(t, err)
err = w.Close()
assert.Nil(t, err)
w.Close()
wBuf := buf.Bytes()
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
@ -650,8 +646,7 @@ func TestCreateCollectionEvent(t *testing.T) {
var buf bytes.Buffer
err = w.Write(&buf)
assert.Nil(t, err)
err = w.Close()
assert.Nil(t, err)
w.Close()
wBuf := buf.Bytes()
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
@ -695,8 +690,7 @@ func TestCreateCollectionEvent(t *testing.T) {
var buf bytes.Buffer
err = w.Write(&buf)
assert.Nil(t, err)
err = w.Close()
assert.Nil(t, err)
w.Close()
wBuf := buf.Bytes()
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
@ -766,8 +760,7 @@ func TestDropCollectionEvent(t *testing.T) {
var buf bytes.Buffer
err = w.Write(&buf)
assert.Nil(t, err)
err = w.Close()
assert.Nil(t, err)
w.Close()
wBuf := buf.Bytes()
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
@ -811,8 +804,7 @@ func TestDropCollectionEvent(t *testing.T) {
var buf bytes.Buffer
err = w.Write(&buf)
assert.Nil(t, err)
err = w.Close()
assert.Nil(t, err)
w.Close()
wBuf := buf.Bytes()
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
@ -882,8 +874,7 @@ func TestCreatePartitionEvent(t *testing.T) {
var buf bytes.Buffer
err = w.Write(&buf)
assert.Nil(t, err)
err = w.Close()
assert.Nil(t, err)
w.Close()
wBuf := buf.Bytes()
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
@ -927,8 +918,7 @@ func TestCreatePartitionEvent(t *testing.T) {
var buf bytes.Buffer
err = w.Write(&buf)
assert.Nil(t, err)
err = w.Close()
assert.Nil(t, err)
w.Close()
wBuf := buf.Bytes()
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
@ -998,8 +988,7 @@ func TestDropPartitionEvent(t *testing.T) {
var buf bytes.Buffer
err = w.Write(&buf)
assert.Nil(t, err)
err = w.Close()
assert.Nil(t, err)
w.Close()
wBuf := buf.Bytes()
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
@ -1043,8 +1032,7 @@ func TestDropPartitionEvent(t *testing.T) {
var buf bytes.Buffer
err = w.Write(&buf)
assert.Nil(t, err)
err = w.Close()
assert.Nil(t, err)
w.Close()
wBuf := buf.Bytes()
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
@ -1108,8 +1096,7 @@ func TestIndexFileEvent(t *testing.T) {
var buf bytes.Buffer
err = w.Write(&buf)
assert.Nil(t, err)
err = w.Close()
assert.Nil(t, err)
w.Close()
wBuf := buf.Bytes()
st := UnsafeReadInt64(wBuf, binary.Size(eventHeader{}))
@ -1281,8 +1268,7 @@ func TestEventClose(t *testing.T) {
var buf bytes.Buffer
err = w.Write(&buf)
assert.Nil(t, err)
err = w.Close()
assert.Nil(t, err)
w.Close()
wBuf := buf.Bytes()
r, err := newEventReader(schemapb.DataType_String, bytes.NewBuffer(wBuf))

View File

@ -111,7 +111,7 @@ type EventWriter interface {
// Finish set meta in header and no data can be added to event writer
Finish() error
// Close release resources
Close() error
Close()
// Write serialize to buffer, should call Finish first
Write(buffer *bytes.Buffer) error
GetMemoryUsageInBytes() (int32, error)
@ -170,15 +170,12 @@ func (writer *baseEventWriter) Finish() error {
return nil
}
func (writer *baseEventWriter) Close() error {
func (writer *baseEventWriter) Close() {
if !writer.isClosed {
writer.isFinish = true
writer.isClosed = true
if err := writer.ReleasePayloadWriter(); err != nil {
return err
}
writer.ReleasePayloadWriter()
}
return nil
}
func (writer *baseEventWriter) SetOffset(offset int32) {

View File

@ -55,8 +55,7 @@ func TestSizeofStruct(t *testing.T) {
func TestEventWriter(t *testing.T) {
insertEvent, err := newInsertEventWriter(schemapb.DataType_Int32)
assert.Nil(t, err)
err = insertEvent.Close()
assert.Nil(t, err)
insertEvent.Close()
insertEvent, err = newInsertEventWriter(schemapb.DataType_Int32)
assert.Nil(t, err)
@ -83,8 +82,7 @@ func TestEventWriter(t *testing.T) {
length, err = insertEvent.GetMemoryUsageInBytes()
assert.Nil(t, err)
assert.EqualValues(t, length, buffer.Len())
err = insertEvent.Close()
assert.Nil(t, err)
insertEvent.Close()
}
func TestReadMagicNumber(t *testing.T) {

View File

@ -45,8 +45,8 @@ type PayloadWriterInterface interface {
FinishPayloadWriter() error
GetPayloadBufferFromWriter() ([]byte, error)
GetPayloadLengthFromWriter() (int, error)
ReleasePayloadWriter() error
Close() error
ReleasePayloadWriter()
Close()
}
type PayloadReaderInterface interface {
@ -328,13 +328,12 @@ func (w *PayloadWriter) GetPayloadLengthFromWriter() (int, error) {
return int(length), nil
}
func (w *PayloadWriter) ReleasePayloadWriter() error {
status := C.ReleasePayloadWriter(w.payloadWriterPtr)
return HandleCStatus(&status, "ReleasePayloadWriter failed")
func (w *PayloadWriter) ReleasePayloadWriter() {
C.ReleasePayloadWriter(w.payloadWriterPtr)
}
func (w *PayloadWriter) Close() error {
return w.ReleasePayloadWriter()
func (w *PayloadWriter) Close() {
w.ReleasePayloadWriter()
}
func NewPayloadReader(colType schemapb.DataType, buf []byte) (*PayloadReader, error) {

View File

@ -340,8 +340,7 @@ func TestPayload_ReaderandWriter(t *testing.T) {
assert.Equal(t, str3, "hello3")
r.ReleasePayloadReader()
err = w.ReleasePayloadWriter()
assert.Nil(t, err)
w.ReleasePayloadWriter()
})
t.Run("TestBinaryVector", func(t *testing.T) {
@ -911,7 +910,6 @@ func TestPayload_ReaderandWriter(t *testing.T) {
_, err = w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
err = w.ReleasePayloadWriter()
assert.Nil(t, err)
w.ReleasePayloadWriter()
})
}

View File

@ -61,10 +61,11 @@ func TestPrintBinlogFilesInt64(t *testing.T) {
assert.NotNil(t, err)
sizeTotal := 20000000
w.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
err = w.Close()
err = w.Finish()
assert.Nil(t, err)
buf, err := w.GetBuffer()
assert.Nil(t, err)
w.Close()
fd, err := ioutil.TempFile("", "binlog_int64.db")
assert.Nil(t, err)