mirror of https://github.com/milvus-io/milvus.git
				
				
				
			
		
			
				
	
	
		
			350 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			350 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
// Licensed to the LF AI & Data foundation under one
 | 
						|
// or more contributor license agreements. See the NOTICE file
 | 
						|
// distributed with this work for additional information
 | 
						|
// regarding copyright ownership. The ASF licenses this file
 | 
						|
// to you under the Apache License, Version 2.0 (the
 | 
						|
// "License"); you may not use this file except in compliance
 | 
						|
// with the License. You may obtain a copy of the License at
 | 
						|
//
 | 
						|
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
//
 | 
						|
// Unless required by applicable law or agreed to in writing, software
 | 
						|
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
// See the License for the specific language governing permissions and
 | 
						|
// limitations under the License.
 | 
						|
 | 
						|
package storage
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"encoding/binary"
 | 
						|
	"fmt"
 | 
						|
 | 
						|
	"github.com/milvus-io/milvus/internal/common"
 | 
						|
	"github.com/milvus-io/milvus/internal/proto/schemapb"
 | 
						|
)
 | 
						|
 | 
						|
// BinlogType is to distinguish different files saving different data.
 | 
						|
type BinlogType int32
 | 
						|
 | 
						|
const (
 | 
						|
	// InsertBinlog BinlogType for insert data
 | 
						|
	InsertBinlog BinlogType = iota
 | 
						|
	// DeleteBinlog BinlogType for delete data
 | 
						|
	DeleteBinlog
 | 
						|
	// DDLBinlog BinlogType for DDL
 | 
						|
	DDLBinlog
 | 
						|
	// IndexFileBinlog BinlogType for index
 | 
						|
	IndexFileBinlog
 | 
						|
)
 | 
						|
const (
 | 
						|
	// MagicNumber used in binlog
 | 
						|
	MagicNumber int32 = 0xfffabc
 | 
						|
)
 | 
						|
 | 
						|
type baseBinlogWriter struct {
 | 
						|
	descriptorEvent
 | 
						|
	magicNumber  int32
 | 
						|
	binlogType   BinlogType
 | 
						|
	eventWriters []EventWriter
 | 
						|
	buffer       *bytes.Buffer
 | 
						|
	length       int32
 | 
						|
}
 | 
						|
 | 
						|
func (writer *baseBinlogWriter) isClosed() bool {
 | 
						|
	return writer.buffer != nil
 | 
						|
}
 | 
						|
 | 
						|
// GetEventNums returns the number of event writers
 | 
						|
func (writer *baseBinlogWriter) GetEventNums() int32 {
 | 
						|
	return int32(len(writer.eventWriters))
 | 
						|
}
 | 
						|
 | 
						|
// GetRowNums returns writer's number of rows
 | 
						|
func (writer *baseBinlogWriter) GetRowNums() (int32, error) {
 | 
						|
	if writer.isClosed() {
 | 
						|
		return writer.length, nil
 | 
						|
	}
 | 
						|
 | 
						|
	length := 0
 | 
						|
	for _, e := range writer.eventWriters {
 | 
						|
		rows, err := e.GetPayloadLengthFromWriter()
 | 
						|
		if err != nil {
 | 
						|
			return 0, err
 | 
						|
		}
 | 
						|
		length += rows
 | 
						|
	}
 | 
						|
	return int32(length), nil
 | 
						|
}
 | 
						|
 | 
						|
// GetBinlogType returns writer's binlogType
 | 
						|
func (writer *baseBinlogWriter) GetBinlogType() BinlogType {
 | 
						|
	return writer.binlogType
 | 
						|
}
 | 
						|
 | 
						|
// GetBuffer gets binlog buffer. Return nil if binlog is not finished yet.
 | 
						|
func (writer *baseBinlogWriter) GetBuffer() ([]byte, error) {
 | 
						|
	if writer.buffer == nil {
 | 
						|
		return nil, fmt.Errorf("please close binlog before get buffer")
 | 
						|
	}
 | 
						|
	return writer.buffer.Bytes(), nil
 | 
						|
}
 | 
						|
 | 
						|
// Finish allocates buffer and releases resource
 | 
						|
func (writer *baseBinlogWriter) Finish() error {
 | 
						|
	if writer.buffer != nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if writer.StartTimestamp == 0 || writer.EndTimestamp == 0 {
 | 
						|
		return fmt.Errorf("invalid start/end timestamp")
 | 
						|
	}
 | 
						|
 | 
						|
	var offset int32
 | 
						|
	writer.buffer = new(bytes.Buffer)
 | 
						|
	if err := binary.Write(writer.buffer, common.Endian, MagicNumber); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	offset += int32(binary.Size(MagicNumber))
 | 
						|
	if err := writer.descriptorEvent.Write(writer.buffer); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	offset += writer.descriptorEvent.GetMemoryUsageInBytes()
 | 
						|
 | 
						|
	writer.length = 0
 | 
						|
	for _, w := range writer.eventWriters {
 | 
						|
		w.SetOffset(offset)
 | 
						|
		if err := w.Finish(); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		if err := w.Write(writer.buffer); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		length, err := w.GetMemoryUsageInBytes()
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		offset += length
 | 
						|
		rows, err := w.GetPayloadLengthFromWriter()
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		writer.length += int32(rows)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (writer *baseBinlogWriter) Close() {
 | 
						|
	for _, e := range writer.eventWriters {
 | 
						|
		e.Close()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// InsertBinlogWriter is an object to write binlog file which saves insert data.
 | 
						|
type InsertBinlogWriter struct {
 | 
						|
	baseBinlogWriter
 | 
						|
}
 | 
						|
 | 
						|
// NextInsertEventWriter returns an event writer to write insert data to an event.
 | 
						|
func (writer *InsertBinlogWriter) NextInsertEventWriter() (*insertEventWriter, error) {
 | 
						|
	if writer.isClosed() {
 | 
						|
		return nil, fmt.Errorf("binlog has closed")
 | 
						|
	}
 | 
						|
	event, err := newInsertEventWriter(writer.PayloadDataType)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	writer.eventWriters = append(writer.eventWriters, event)
 | 
						|
	return event, nil
 | 
						|
}
 | 
						|
 | 
						|
// DeleteBinlogWriter is an object to write binlog file which saves delete data.
 | 
						|
type DeleteBinlogWriter struct {
 | 
						|
	baseBinlogWriter
 | 
						|
}
 | 
						|
 | 
						|
// NextDeleteEventWriter returns an event writer to write delete data to an event.
 | 
						|
func (writer *DeleteBinlogWriter) NextDeleteEventWriter() (*deleteEventWriter, error) {
 | 
						|
	if writer.isClosed() {
 | 
						|
		return nil, fmt.Errorf("binlog has closed")
 | 
						|
	}
 | 
						|
	event, err := newDeleteEventWriter(writer.PayloadDataType)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	writer.eventWriters = append(writer.eventWriters, event)
 | 
						|
	return event, nil
 | 
						|
}
 | 
						|
 | 
						|
// DDLBinlogWriter is an object to write binlog file which saves ddl information.
 | 
						|
type DDLBinlogWriter struct {
 | 
						|
	baseBinlogWriter
 | 
						|
}
 | 
						|
 | 
						|
// NextCreateCollectionEventWriter returns an event writer to write CreateCollection
 | 
						|
// information to an event.
 | 
						|
func (writer *DDLBinlogWriter) NextCreateCollectionEventWriter() (*createCollectionEventWriter, error) {
 | 
						|
	if writer.isClosed() {
 | 
						|
		return nil, fmt.Errorf("binlog has closed")
 | 
						|
	}
 | 
						|
	event, err := newCreateCollectionEventWriter(writer.PayloadDataType)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	writer.eventWriters = append(writer.eventWriters, event)
 | 
						|
	return event, nil
 | 
						|
}
 | 
						|
 | 
						|
// NextDropCollectionEventWriter returns an event writer to write DropCollection
 | 
						|
// information to an event.
 | 
						|
func (writer *DDLBinlogWriter) NextDropCollectionEventWriter() (*dropCollectionEventWriter, error) {
 | 
						|
	if writer.isClosed() {
 | 
						|
		return nil, fmt.Errorf("binlog has closed")
 | 
						|
	}
 | 
						|
	event, err := newDropCollectionEventWriter(writer.PayloadDataType)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	writer.eventWriters = append(writer.eventWriters, event)
 | 
						|
	return event, nil
 | 
						|
}
 | 
						|
 | 
						|
// NextCreatePartitionEventWriter returns an event writer to write CreatePartition
 | 
						|
// information to an event.
 | 
						|
func (writer *DDLBinlogWriter) NextCreatePartitionEventWriter() (*createPartitionEventWriter, error) {
 | 
						|
	if writer.isClosed() {
 | 
						|
		return nil, fmt.Errorf("binlog has closed")
 | 
						|
	}
 | 
						|
	event, err := newCreatePartitionEventWriter(writer.PayloadDataType)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	writer.eventWriters = append(writer.eventWriters, event)
 | 
						|
	return event, nil
 | 
						|
}
 | 
						|
 | 
						|
// NextDropPartitionEventWriter returns an event writer to write DropPartition
 | 
						|
// information to an event.
 | 
						|
func (writer *DDLBinlogWriter) NextDropPartitionEventWriter() (*dropPartitionEventWriter, error) {
 | 
						|
	if writer.isClosed() {
 | 
						|
		return nil, fmt.Errorf("binlog has closed")
 | 
						|
	}
 | 
						|
	event, err := newDropPartitionEventWriter(writer.PayloadDataType)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	writer.eventWriters = append(writer.eventWriters, event)
 | 
						|
	return event, nil
 | 
						|
}
 | 
						|
 | 
						|
// IndexFileBinlogWriter is an object to write binlog file which saves index files
 | 
						|
type IndexFileBinlogWriter struct {
 | 
						|
	baseBinlogWriter
 | 
						|
}
 | 
						|
 | 
						|
// NextIndexFileEventWriter return next available EventWriter
 | 
						|
func (writer *IndexFileBinlogWriter) NextIndexFileEventWriter() (*indexFileEventWriter, error) {
 | 
						|
	if writer.isClosed() {
 | 
						|
		return nil, fmt.Errorf("binlog has closed")
 | 
						|
	}
 | 
						|
	event, err := newIndexFileEventWriter()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	writer.eventWriters = append(writer.eventWriters, event)
 | 
						|
	return event, nil
 | 
						|
}
 | 
						|
 | 
						|
// NewInsertBinlogWriter creates InsertBinlogWriter to write binlog file.
 | 
						|
func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID, FieldID int64) *InsertBinlogWriter {
 | 
						|
	descriptorEvent := newDescriptorEvent()
 | 
						|
	descriptorEvent.PayloadDataType = dataType
 | 
						|
	descriptorEvent.CollectionID = collectionID
 | 
						|
	descriptorEvent.PartitionID = partitionID
 | 
						|
	descriptorEvent.SegmentID = segmentID
 | 
						|
	descriptorEvent.FieldID = FieldID
 | 
						|
 | 
						|
	w := &InsertBinlogWriter{
 | 
						|
		baseBinlogWriter: baseBinlogWriter{
 | 
						|
			descriptorEvent: *descriptorEvent,
 | 
						|
			magicNumber:     MagicNumber,
 | 
						|
			binlogType:      InsertBinlog,
 | 
						|
			eventWriters:    make([]EventWriter, 0),
 | 
						|
			buffer:          nil,
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	return w
 | 
						|
}
 | 
						|
 | 
						|
// NewDeleteBinlogWriter creates DeleteBinlogWriter to write binlog file.
 | 
						|
func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID int64) *DeleteBinlogWriter {
 | 
						|
	descriptorEvent := newDescriptorEvent()
 | 
						|
	descriptorEvent.PayloadDataType = dataType
 | 
						|
	descriptorEvent.CollectionID = collectionID
 | 
						|
	descriptorEvent.PartitionID = partitionID
 | 
						|
	descriptorEvent.SegmentID = segmentID
 | 
						|
	w := &DeleteBinlogWriter{
 | 
						|
		baseBinlogWriter: baseBinlogWriter{
 | 
						|
			descriptorEvent: *descriptorEvent,
 | 
						|
			magicNumber:     MagicNumber,
 | 
						|
			binlogType:      DeleteBinlog,
 | 
						|
			eventWriters:    make([]EventWriter, 0),
 | 
						|
			buffer:          nil,
 | 
						|
		},
 | 
						|
	}
 | 
						|
	return w
 | 
						|
}
 | 
						|
 | 
						|
// NewDDLBinlogWriter creates DDLBinlogWriter to write binlog file.
 | 
						|
func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) *DDLBinlogWriter {
 | 
						|
	descriptorEvent := newDescriptorEvent()
 | 
						|
	descriptorEvent.PayloadDataType = dataType
 | 
						|
	descriptorEvent.CollectionID = collectionID
 | 
						|
	w := &DDLBinlogWriter{
 | 
						|
		baseBinlogWriter: baseBinlogWriter{
 | 
						|
			descriptorEvent: *descriptorEvent,
 | 
						|
			magicNumber:     MagicNumber,
 | 
						|
			binlogType:      DDLBinlog,
 | 
						|
			eventWriters:    make([]EventWriter, 0),
 | 
						|
			buffer:          nil,
 | 
						|
		},
 | 
						|
	}
 | 
						|
	return w
 | 
						|
}
 | 
						|
 | 
						|
// NewIndexFileBinlogWriter returns a new IndexFileBinlogWriter with provided parameters
 | 
						|
func NewIndexFileBinlogWriter(
 | 
						|
	indexBuildID UniqueID,
 | 
						|
	version int64,
 | 
						|
	collectionID UniqueID,
 | 
						|
	partitionID UniqueID,
 | 
						|
	segmentID UniqueID,
 | 
						|
	fieldID UniqueID,
 | 
						|
	indexName string,
 | 
						|
	indexID UniqueID,
 | 
						|
	key string,
 | 
						|
) *IndexFileBinlogWriter {
 | 
						|
	descriptorEvent := newDescriptorEvent()
 | 
						|
	descriptorEvent.CollectionID = collectionID
 | 
						|
	descriptorEvent.PartitionID = partitionID
 | 
						|
	descriptorEvent.SegmentID = segmentID
 | 
						|
	descriptorEvent.FieldID = fieldID
 | 
						|
	descriptorEvent.PayloadDataType = schemapb.DataType_Int8
 | 
						|
	descriptorEvent.AddExtra("indexBuildID", fmt.Sprintf("%d", indexBuildID))
 | 
						|
	descriptorEvent.AddExtra("version", fmt.Sprintf("%d", version))
 | 
						|
	descriptorEvent.AddExtra("indexName", indexName)
 | 
						|
	descriptorEvent.AddExtra("indexID", fmt.Sprintf("%d", indexID))
 | 
						|
	descriptorEvent.AddExtra("key", key)
 | 
						|
	w := &IndexFileBinlogWriter{
 | 
						|
		baseBinlogWriter: baseBinlogWriter{
 | 
						|
			descriptorEvent: *descriptorEvent,
 | 
						|
			magicNumber:     MagicNumber,
 | 
						|
			binlogType:      IndexFileBinlog,
 | 
						|
			eventWriters:    make([]EventWriter, 0),
 | 
						|
			buffer:          nil,
 | 
						|
		},
 | 
						|
	}
 | 
						|
	return w
 | 
						|
}
 |