2021-04-19 03:32:24 +00:00
|
|
|
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
|
|
|
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
|
|
|
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
|
|
|
|
2020-12-09 01:55:56 +00:00
|
|
|
package storage
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"encoding/binary"
|
2021-07-07 11:10:07 +00:00
|
|
|
"fmt"
|
2021-03-05 02:15:27 +00:00
|
|
|
|
2021-04-22 06:45:57 +00:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
2020-12-09 01:55:56 +00:00
|
|
|
)
|
|
|
|
|
2021-09-18 14:55:51 +00:00
|
|
|
// BinlogType is to distinguish different files saving different data.
|
2020-12-09 01:55:56 +00:00
|
|
|
type BinlogType int32
|
|
|
|
|
|
|
|
const (
|
|
|
|
InsertBinlog BinlogType = iota
|
|
|
|
DeleteBinlog
|
|
|
|
DDLBinlog
|
2021-09-29 01:52:12 +00:00
|
|
|
IndexFileBinlog
|
2020-12-09 01:55:56 +00:00
|
|
|
)
|
|
|
|
const (
|
2020-12-10 06:52:42 +00:00
|
|
|
MagicNumber int32 = 0xfffabc
|
2020-12-09 01:55:56 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type baseBinlogWriter struct {
|
|
|
|
descriptorEvent
|
2020-12-10 07:50:09 +00:00
|
|
|
magicNumber int32
|
|
|
|
binlogType BinlogType
|
|
|
|
eventWriters []EventWriter
|
|
|
|
buffer *bytes.Buffer
|
|
|
|
length int32
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
|
|
|
|
2020-12-10 07:50:09 +00:00
|
|
|
func (writer *baseBinlogWriter) isClosed() bool {
|
|
|
|
return writer.buffer != nil
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (writer *baseBinlogWriter) GetEventNums() int32 {
|
2020-12-10 07:50:09 +00:00
|
|
|
return int32(len(writer.eventWriters))
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (writer *baseBinlogWriter) GetRowNums() (int32, error) {
|
2020-12-10 07:50:09 +00:00
|
|
|
if writer.isClosed() {
|
|
|
|
return writer.length, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
length := 0
|
|
|
|
for _, e := range writer.eventWriters {
|
|
|
|
rows, err := e.GetPayloadLengthFromWriter()
|
2020-12-09 01:55:56 +00:00
|
|
|
if err != nil {
|
2020-12-10 07:50:09 +00:00
|
|
|
return 0, err
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
2020-12-10 07:50:09 +00:00
|
|
|
length += rows
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
2020-12-10 07:50:09 +00:00
|
|
|
return int32(length), nil
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (writer *baseBinlogWriter) GetBinlogType() BinlogType {
|
|
|
|
return writer.binlogType
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetBuffer get binlog buffer. Return nil if binlog is not finished yet.
|
2020-12-10 07:50:09 +00:00
|
|
|
func (writer *baseBinlogWriter) GetBuffer() ([]byte, error) {
|
|
|
|
if writer.buffer == nil {
|
2021-07-07 11:10:07 +00:00
|
|
|
return nil, fmt.Errorf("please close binlog before get buffer")
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
2020-12-10 07:50:09 +00:00
|
|
|
return writer.buffer.Bytes(), nil
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Close allocate buffer and release resource
|
|
|
|
func (writer *baseBinlogWriter) Close() error {
|
2020-12-10 07:50:09 +00:00
|
|
|
if writer.buffer != nil {
|
2020-12-09 01:55:56 +00:00
|
|
|
return nil
|
|
|
|
}
|
2021-07-07 11:10:07 +00:00
|
|
|
if writer.StartTimestamp == 0 || writer.EndTimestamp == 0 {
|
|
|
|
return fmt.Errorf("invalid start/end timestamp")
|
2020-12-11 03:29:07 +00:00
|
|
|
}
|
|
|
|
|
2021-07-07 11:10:07 +00:00
|
|
|
var offset int32 = 0
|
2020-12-09 01:55:56 +00:00
|
|
|
writer.buffer = new(bytes.Buffer)
|
|
|
|
if err := binary.Write(writer.buffer, binary.LittleEndian, int32(MagicNumber)); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-07-07 11:10:07 +00:00
|
|
|
offset += int32(binary.Size(MagicNumber))
|
2020-12-09 01:55:56 +00:00
|
|
|
if err := writer.descriptorEvent.Write(writer.buffer); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-07-07 11:10:07 +00:00
|
|
|
offset += writer.descriptorEvent.GetMemoryUsageInBytes()
|
|
|
|
|
2020-12-10 07:50:09 +00:00
|
|
|
writer.length = 0
|
2020-12-09 01:55:56 +00:00
|
|
|
for _, w := range writer.eventWriters {
|
2020-12-10 07:50:09 +00:00
|
|
|
w.SetOffset(offset)
|
|
|
|
if err := w.Finish(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-12-09 01:55:56 +00:00
|
|
|
if err := w.Write(writer.buffer); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-12-10 07:50:09 +00:00
|
|
|
length, err := w.GetMemoryUsageInBytes()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
offset += length
|
|
|
|
rows, err := w.GetPayloadLengthFromWriter()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
writer.length += int32(rows)
|
|
|
|
if err := w.ReleasePayloadWriter(); err != nil {
|
2020-12-09 01:55:56 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-18 10:39:56 +00:00
|
|
|
// InsertBinlogWriter is an object to write binlog file which saves insert data.
|
2020-12-09 01:55:56 +00:00
|
|
|
type InsertBinlogWriter struct {
|
|
|
|
baseBinlogWriter
|
|
|
|
}
|
|
|
|
|
2021-09-18 10:39:56 +00:00
|
|
|
// NextInsertEventWriter returns an event writer to write insert data to an event.
|
2020-12-09 01:55:56 +00:00
|
|
|
func (writer *InsertBinlogWriter) NextInsertEventWriter() (*insertEventWriter, error) {
|
2020-12-10 07:50:09 +00:00
|
|
|
if writer.isClosed() {
|
2021-07-07 11:10:07 +00:00
|
|
|
return nil, fmt.Errorf("binlog has closed")
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
2020-12-10 07:50:09 +00:00
|
|
|
event, err := newInsertEventWriter(writer.PayloadDataType)
|
2020-12-09 01:55:56 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-12-10 07:50:09 +00:00
|
|
|
writer.eventWriters = append(writer.eventWriters, event)
|
2020-12-09 01:55:56 +00:00
|
|
|
return event, nil
|
|
|
|
}
|
|
|
|
|
2021-09-18 10:39:56 +00:00
|
|
|
// DeleteBinlogWriter is an object to write binlog file which saves delete data.
|
2020-12-09 01:55:56 +00:00
|
|
|
type DeleteBinlogWriter struct {
|
|
|
|
baseBinlogWriter
|
|
|
|
}
|
|
|
|
|
2021-09-18 10:39:56 +00:00
|
|
|
// NextDeleteEventWriter returns an event writer to write delete data to an event.
|
2020-12-09 01:55:56 +00:00
|
|
|
func (writer *DeleteBinlogWriter) NextDeleteEventWriter() (*deleteEventWriter, error) {
|
2020-12-10 07:50:09 +00:00
|
|
|
if writer.isClosed() {
|
2021-07-07 11:10:07 +00:00
|
|
|
return nil, fmt.Errorf("binlog has closed")
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
2020-12-10 07:50:09 +00:00
|
|
|
event, err := newDeleteEventWriter(writer.PayloadDataType)
|
2020-12-09 01:55:56 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-12-10 07:50:09 +00:00
|
|
|
writer.eventWriters = append(writer.eventWriters, event)
|
2020-12-09 01:55:56 +00:00
|
|
|
return event, nil
|
|
|
|
}
|
|
|
|
|
2021-09-18 10:39:56 +00:00
|
|
|
// DDLBinlogWriter is an object to write binlog file which saves ddl information.
|
2020-12-09 01:55:56 +00:00
|
|
|
type DDLBinlogWriter struct {
|
|
|
|
baseBinlogWriter
|
|
|
|
}
|
|
|
|
|
2021-09-18 10:39:56 +00:00
|
|
|
// NextCreateCollectionEventWriter returns an event writer to write CreateCollection
|
|
|
|
// information to an event.
|
2020-12-09 01:55:56 +00:00
|
|
|
func (writer *DDLBinlogWriter) NextCreateCollectionEventWriter() (*createCollectionEventWriter, error) {
|
2020-12-10 07:50:09 +00:00
|
|
|
if writer.isClosed() {
|
2021-07-07 11:10:07 +00:00
|
|
|
return nil, fmt.Errorf("binlog has closed")
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
2020-12-10 07:50:09 +00:00
|
|
|
event, err := newCreateCollectionEventWriter(writer.PayloadDataType)
|
2020-12-09 01:55:56 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-12-10 07:50:09 +00:00
|
|
|
writer.eventWriters = append(writer.eventWriters, event)
|
2020-12-09 01:55:56 +00:00
|
|
|
return event, nil
|
|
|
|
}
|
|
|
|
|
2021-09-18 10:39:56 +00:00
|
|
|
// NextDropCollectionEventWriter returns an event writer to write DropCollection
|
|
|
|
// information to an event.
|
2020-12-09 01:55:56 +00:00
|
|
|
func (writer *DDLBinlogWriter) NextDropCollectionEventWriter() (*dropCollectionEventWriter, error) {
|
2020-12-10 07:50:09 +00:00
|
|
|
if writer.isClosed() {
|
2021-07-07 11:10:07 +00:00
|
|
|
return nil, fmt.Errorf("binlog has closed")
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
2020-12-10 07:50:09 +00:00
|
|
|
event, err := newDropCollectionEventWriter(writer.PayloadDataType)
|
2020-12-09 01:55:56 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-12-10 07:50:09 +00:00
|
|
|
writer.eventWriters = append(writer.eventWriters, event)
|
2020-12-09 01:55:56 +00:00
|
|
|
return event, nil
|
|
|
|
}
|
|
|
|
|
2021-09-18 10:39:56 +00:00
|
|
|
// NextCreatePartitionEventWriter returns an event writer to write CreatePartition
|
|
|
|
// information to an event.
|
2020-12-09 01:55:56 +00:00
|
|
|
func (writer *DDLBinlogWriter) NextCreatePartitionEventWriter() (*createPartitionEventWriter, error) {
|
2020-12-10 07:50:09 +00:00
|
|
|
if writer.isClosed() {
|
2021-07-07 11:10:07 +00:00
|
|
|
return nil, fmt.Errorf("binlog has closed")
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
2020-12-10 07:50:09 +00:00
|
|
|
event, err := newCreatePartitionEventWriter(writer.PayloadDataType)
|
2020-12-09 01:55:56 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-12-10 07:50:09 +00:00
|
|
|
writer.eventWriters = append(writer.eventWriters, event)
|
2020-12-09 01:55:56 +00:00
|
|
|
return event, nil
|
|
|
|
}
|
|
|
|
|
2021-09-18 10:39:56 +00:00
|
|
|
// NextDropPartitionEventWriter returns an event writer to write DropPartition
|
|
|
|
// information to an event.
|
2020-12-09 01:55:56 +00:00
|
|
|
func (writer *DDLBinlogWriter) NextDropPartitionEventWriter() (*dropPartitionEventWriter, error) {
|
2020-12-10 07:50:09 +00:00
|
|
|
if writer.isClosed() {
|
2021-07-07 11:10:07 +00:00
|
|
|
return nil, fmt.Errorf("binlog has closed")
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
2020-12-10 07:50:09 +00:00
|
|
|
event, err := newDropPartitionEventWriter(writer.PayloadDataType)
|
2020-12-09 01:55:56 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-12-10 07:50:09 +00:00
|
|
|
writer.eventWriters = append(writer.eventWriters, event)
|
2020-12-09 01:55:56 +00:00
|
|
|
return event, nil
|
|
|
|
}
|
|
|
|
|
2021-09-29 01:52:12 +00:00
|
|
|
type IndexFileBinlogWriter struct {
|
|
|
|
baseBinlogWriter
|
|
|
|
}
|
|
|
|
|
|
|
|
func (writer *IndexFileBinlogWriter) NextIndexFileEventWriter() (*indexFileEventWriter, error) {
|
|
|
|
if writer.isClosed() {
|
|
|
|
return nil, fmt.Errorf("binlog has closed")
|
|
|
|
}
|
|
|
|
event, err := newIndexFileEventWriter()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
writer.eventWriters = append(writer.eventWriters, event)
|
|
|
|
return event, nil
|
|
|
|
}
|
|
|
|
|
2021-09-18 10:39:56 +00:00
|
|
|
// NewInsertBinlogWriter creates InsertBinlogWriter to write binlog file.
|
2021-04-19 02:36:19 +00:00
|
|
|
func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID, FieldID int64) *InsertBinlogWriter {
|
|
|
|
descriptorEvent := newDescriptorEvent()
|
2020-12-09 01:55:56 +00:00
|
|
|
descriptorEvent.PayloadDataType = dataType
|
2020-12-11 03:29:07 +00:00
|
|
|
descriptorEvent.CollectionID = collectionID
|
|
|
|
descriptorEvent.PartitionID = partitionID
|
|
|
|
descriptorEvent.SegmentID = segmentID
|
|
|
|
descriptorEvent.FieldID = FieldID
|
2020-12-09 01:55:56 +00:00
|
|
|
return &InsertBinlogWriter{
|
|
|
|
baseBinlogWriter: baseBinlogWriter{
|
2020-12-10 07:50:09 +00:00
|
|
|
descriptorEvent: *descriptorEvent,
|
|
|
|
magicNumber: MagicNumber,
|
|
|
|
binlogType: InsertBinlog,
|
|
|
|
eventWriters: make([]EventWriter, 0),
|
|
|
|
buffer: nil,
|
2020-12-09 01:55:56 +00:00
|
|
|
},
|
2020-12-09 03:18:13 +00:00
|
|
|
}
|
2021-04-19 02:36:19 +00:00
|
|
|
}
|
2021-07-07 11:10:07 +00:00
|
|
|
|
2021-09-18 10:39:56 +00:00
|
|
|
// NewDeleteBinlogWriter creates DeleteBinlogWriter to write binlog file.
|
2021-09-28 06:30:02 +00:00
|
|
|
func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID int64) *DeleteBinlogWriter {
|
2021-04-19 02:36:19 +00:00
|
|
|
descriptorEvent := newDescriptorEvent()
|
2020-12-09 01:55:56 +00:00
|
|
|
descriptorEvent.PayloadDataType = dataType
|
2020-12-12 09:10:42 +00:00
|
|
|
descriptorEvent.CollectionID = collectionID
|
2021-09-28 06:30:02 +00:00
|
|
|
descriptorEvent.PartitionID = partitionID
|
|
|
|
descriptorEvent.SegmentID = segmentID
|
2020-12-09 01:55:56 +00:00
|
|
|
return &DeleteBinlogWriter{
|
|
|
|
baseBinlogWriter: baseBinlogWriter{
|
2020-12-10 07:50:09 +00:00
|
|
|
descriptorEvent: *descriptorEvent,
|
|
|
|
magicNumber: MagicNumber,
|
|
|
|
binlogType: DeleteBinlog,
|
|
|
|
eventWriters: make([]EventWriter, 0),
|
|
|
|
buffer: nil,
|
2020-12-09 01:55:56 +00:00
|
|
|
},
|
2020-12-09 03:18:13 +00:00
|
|
|
}
|
2021-04-19 02:36:19 +00:00
|
|
|
}
|
2021-07-07 11:10:07 +00:00
|
|
|
|
2021-09-18 10:39:56 +00:00
|
|
|
// NewDDLBinlogWriter creates DDLBinlogWriter to write binlog file.
|
2021-04-19 02:36:19 +00:00
|
|
|
func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) *DDLBinlogWriter {
|
|
|
|
descriptorEvent := newDescriptorEvent()
|
2020-12-09 01:55:56 +00:00
|
|
|
descriptorEvent.PayloadDataType = dataType
|
2020-12-12 09:10:42 +00:00
|
|
|
descriptorEvent.CollectionID = collectionID
|
2020-12-09 01:55:56 +00:00
|
|
|
return &DDLBinlogWriter{
|
|
|
|
baseBinlogWriter: baseBinlogWriter{
|
2020-12-10 07:50:09 +00:00
|
|
|
descriptorEvent: *descriptorEvent,
|
|
|
|
magicNumber: MagicNumber,
|
|
|
|
binlogType: DDLBinlog,
|
|
|
|
eventWriters: make([]EventWriter, 0),
|
|
|
|
buffer: nil,
|
2020-12-09 01:55:56 +00:00
|
|
|
},
|
2021-04-19 02:36:19 +00:00
|
|
|
}
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
2021-09-29 01:52:12 +00:00
|
|
|
|
|
|
|
func NewIndexFileBinlogWriter(
|
|
|
|
indexBuildID UniqueID,
|
|
|
|
version int64,
|
|
|
|
collectionID UniqueID,
|
|
|
|
partitionID UniqueID,
|
|
|
|
segmentID UniqueID,
|
|
|
|
fieldID UniqueID,
|
|
|
|
indexName string,
|
|
|
|
indexID UniqueID,
|
|
|
|
key string,
|
|
|
|
) *IndexFileBinlogWriter {
|
|
|
|
descriptorEvent := newDescriptorEvent()
|
|
|
|
descriptorEvent.CollectionID = collectionID
|
|
|
|
descriptorEvent.PartitionID = partitionID
|
|
|
|
descriptorEvent.SegmentID = segmentID
|
|
|
|
descriptorEvent.FieldID = fieldID
|
|
|
|
descriptorEvent.PayloadDataType = schemapb.DataType_String
|
|
|
|
descriptorEvent.AddExtra("indexBuildID", fmt.Sprintf("%d", indexBuildID))
|
|
|
|
descriptorEvent.AddExtra("version", fmt.Sprintf("%d", version))
|
|
|
|
descriptorEvent.AddExtra("indexName", indexName)
|
|
|
|
descriptorEvent.AddExtra("indexID", fmt.Sprintf("%d", indexID))
|
|
|
|
descriptorEvent.AddExtra("key", key)
|
|
|
|
return &IndexFileBinlogWriter{
|
|
|
|
baseBinlogWriter: baseBinlogWriter{
|
|
|
|
descriptorEvent: *descriptorEvent,
|
|
|
|
magicNumber: MagicNumber,
|
|
|
|
binlogType: IndexFileBinlog,
|
|
|
|
eventWriters: make([]EventWriter, 0),
|
|
|
|
buffer: nil,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|