2021-12-23 04:07:10 +00:00
|
|
|
// Licensed to the LF AI & Data foundation under one
|
|
|
|
// or more contributor license agreements. See the NOTICE file
|
|
|
|
// distributed with this work for additional information
|
|
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
|
|
// to you under the Apache License, Version 2.0 (the
|
|
|
|
// "License"); you may not use this file except in compliance
|
2021-04-19 03:32:24 +00:00
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
2021-12-23 04:07:10 +00:00
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
2021-04-19 03:32:24 +00:00
|
|
|
//
|
2021-12-23 04:07:10 +00:00
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
2021-04-19 03:32:24 +00:00
|
|
|
|
2020-12-09 01:55:56 +00:00
|
|
|
package storage
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"encoding/binary"
|
2021-07-07 11:10:07 +00:00
|
|
|
"io"
|
2021-03-05 02:15:27 +00:00
|
|
|
|
2023-02-26 03:31:49 +00:00
|
|
|
"github.com/cockroachdb/errors"
|
|
|
|
|
2023-06-08 17:28:37 +00:00
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
2023-04-06 11:14:32 +00:00
|
|
|
"github.com/milvus-io/milvus/pkg/common"
|
2020-12-09 01:55:56 +00:00
|
|
|
)
|
|
|
|
|
2021-12-14 11:55:17 +00:00
|
|
|
// EventTypeCode represents event type by code
|
2020-12-09 01:55:56 +00:00
|
|
|
type EventTypeCode int8
|
|
|
|
|
2021-12-14 11:55:17 +00:00
|
|
|
// EventTypeCode definitions
|
2020-12-09 01:55:56 +00:00
|
|
|
const (
|
|
|
|
DescriptorEventType EventTypeCode = iota
|
|
|
|
InsertEventType
|
|
|
|
DeleteEventType
|
|
|
|
CreateCollectionEventType
|
|
|
|
DropCollectionEventType
|
|
|
|
CreatePartitionEventType
|
|
|
|
DropPartitionEventType
|
2021-09-29 01:52:12 +00:00
|
|
|
IndexFileEventType
|
2020-12-09 03:18:13 +00:00
|
|
|
EventTypeEnd
|
2020-12-09 01:55:56 +00:00
|
|
|
)
|
|
|
|
|
2021-12-14 11:55:17 +00:00
|
|
|
// String returns the string representation
|
2020-12-09 01:55:56 +00:00
|
|
|
func (code EventTypeCode) String() string {
|
2021-07-07 11:10:07 +00:00
|
|
|
codes := map[EventTypeCode]string{
|
|
|
|
DescriptorEventType: "DescriptorEventType",
|
|
|
|
InsertEventType: "InsertEventType",
|
|
|
|
DeleteEventType: "DeleteEventType",
|
|
|
|
CreateCollectionEventType: "CreateCollectionEventType",
|
|
|
|
DropCollectionEventType: "DropCollectionEventType",
|
|
|
|
CreatePartitionEventType: "CreatePartitionEventType",
|
|
|
|
DropPartitionEventType: "DropPartitionEventType",
|
2021-09-29 01:52:12 +00:00
|
|
|
IndexFileEventType: "IndexFileEventType",
|
2021-07-07 11:10:07 +00:00
|
|
|
}
|
|
|
|
if eventTypeStr, ok := codes[code]; ok {
|
|
|
|
return eventTypeStr
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
2021-07-07 11:10:07 +00:00
|
|
|
return "InvalidEventType"
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type descriptorEvent struct {
|
|
|
|
descriptorEventHeader
|
|
|
|
descriptorEventData
|
|
|
|
}
|
|
|
|
|
2021-12-17 14:02:42 +00:00
|
|
|
// GetMemoryUsageInBytes returns descriptor Event memory usage in bytes
|
2020-12-09 01:55:56 +00:00
|
|
|
func (event *descriptorEvent) GetMemoryUsageInBytes() int32 {
|
|
|
|
return event.descriptorEventHeader.GetMemoryUsageInBytes() + event.descriptorEventData.GetMemoryUsageInBytes()
|
|
|
|
}
|
|
|
|
|
2021-12-17 14:02:42 +00:00
|
|
|
// Write writes descriptor event into buffer
|
2020-12-09 01:55:56 +00:00
|
|
|
func (event *descriptorEvent) Write(buffer io.Writer) error {
|
2021-09-23 09:23:54 +00:00
|
|
|
err := event.descriptorEventData.FinishExtra()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
event.descriptorEventHeader.EventLength = event.descriptorEventHeader.GetMemoryUsageInBytes() + event.descriptorEventData.GetMemoryUsageInBytes()
|
|
|
|
event.descriptorEventHeader.NextPosition = int32(binary.Size(MagicNumber)) + event.descriptorEventHeader.EventLength
|
|
|
|
|
2020-12-09 01:55:56 +00:00
|
|
|
if err := event.descriptorEventHeader.Write(buffer); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := event.descriptorEventData.Write(buffer); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-12-14 12:05:09 +00:00
|
|
|
// EventWriter abstracts event writer
|
2020-12-09 01:55:56 +00:00
|
|
|
type EventWriter interface {
|
|
|
|
PayloadWriterInterface
|
|
|
|
// Finish set meta in header and no data can be added to event writer
|
|
|
|
Finish() error
|
|
|
|
// Close release resources
|
2021-12-09 04:37:06 +00:00
|
|
|
Close()
|
2020-12-09 01:55:56 +00:00
|
|
|
// Write serialize to buffer, should call Finish first
|
|
|
|
Write(buffer *bytes.Buffer) error
|
|
|
|
GetMemoryUsageInBytes() (int32, error)
|
2020-12-10 07:50:09 +00:00
|
|
|
SetOffset(offset int32)
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type baseEventWriter struct {
|
|
|
|
eventHeader
|
|
|
|
PayloadWriterInterface
|
|
|
|
isClosed bool
|
|
|
|
isFinish bool
|
|
|
|
offset int32
|
|
|
|
getEventDataSize func() int32
|
|
|
|
writeEventData func(buffer io.Writer) error
|
|
|
|
}
|
|
|
|
|
|
|
|
func (writer *baseEventWriter) GetMemoryUsageInBytes() (int32, error) {
|
|
|
|
data, err := writer.GetPayloadBufferFromWriter()
|
|
|
|
if err != nil {
|
|
|
|
return -1, err
|
|
|
|
}
|
2021-07-07 11:10:07 +00:00
|
|
|
size := writer.getEventDataSize() + writer.eventHeader.GetMemoryUsageInBytes() + int32(len(data))
|
|
|
|
return size, nil
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (writer *baseEventWriter) Write(buffer *bytes.Buffer) error {
|
|
|
|
if err := writer.eventHeader.Write(buffer); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := writer.writeEventData(buffer); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
data, err := writer.GetPayloadBufferFromWriter()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-11-02 10:16:32 +00:00
|
|
|
if err := binary.Write(buffer, common.Endian, data); err != nil {
|
2020-12-09 01:55:56 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (writer *baseEventWriter) Finish() error {
|
|
|
|
if !writer.isFinish {
|
|
|
|
writer.isFinish = true
|
|
|
|
if err := writer.FinishPayloadWriter(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
eventLength, err := writer.GetMemoryUsageInBytes()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
writer.EventLength = eventLength
|
|
|
|
writer.NextPosition = eventLength + writer.offset
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-12-09 04:37:06 +00:00
|
|
|
func (writer *baseEventWriter) Close() {
|
2020-12-09 01:55:56 +00:00
|
|
|
if !writer.isClosed {
|
|
|
|
writer.isFinish = true
|
|
|
|
writer.isClosed = true
|
2021-12-09 04:37:06 +00:00
|
|
|
writer.ReleasePayloadWriter()
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-12-10 07:50:09 +00:00
|
|
|
func (writer *baseEventWriter) SetOffset(offset int32) {
|
|
|
|
writer.offset = offset
|
|
|
|
}
|
|
|
|
|
2020-12-09 01:55:56 +00:00
|
|
|
type insertEventWriter struct {
|
|
|
|
baseEventWriter
|
|
|
|
insertEventData
|
|
|
|
}
|
|
|
|
|
|
|
|
type deleteEventWriter struct {
|
|
|
|
baseEventWriter
|
|
|
|
deleteEventData
|
|
|
|
}
|
|
|
|
|
|
|
|
type createCollectionEventWriter struct {
|
|
|
|
baseEventWriter
|
|
|
|
createCollectionEventData
|
|
|
|
}
|
|
|
|
|
|
|
|
type dropCollectionEventWriter struct {
|
|
|
|
baseEventWriter
|
|
|
|
dropCollectionEventData
|
|
|
|
}
|
|
|
|
|
|
|
|
type createPartitionEventWriter struct {
|
|
|
|
baseEventWriter
|
|
|
|
createPartitionEventData
|
|
|
|
}
|
|
|
|
|
|
|
|
type dropPartitionEventWriter struct {
|
|
|
|
baseEventWriter
|
|
|
|
dropPartitionEventData
|
|
|
|
}
|
|
|
|
|
2021-09-29 01:52:12 +00:00
|
|
|
type indexFileEventWriter struct {
|
|
|
|
baseEventWriter
|
|
|
|
indexFileEventData
|
|
|
|
}
|
|
|
|
|
2021-04-19 02:36:19 +00:00
|
|
|
func newDescriptorEvent() *descriptorEvent {
|
|
|
|
header := newDescriptorEventHeader()
|
|
|
|
data := newDescriptorEventData()
|
2020-12-09 03:18:13 +00:00
|
|
|
return &descriptorEvent{
|
|
|
|
descriptorEventHeader: *header,
|
|
|
|
descriptorEventData: *data,
|
2021-04-19 02:36:19 +00:00
|
|
|
}
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
|
|
|
|
enhance: add delta log stream new format reader and writer (#34116)
issue: #34123
Benchmark case: The benchmark run the go benchmark function
`BenchmarkDeltalogFormat` which is put in the Files changed. It tests
the performance of serializing and deserializing from two different data
formats under a 10 million delete log dataset.
Metrics: The benchmarks measure the average time taken per operation
(ns/op), memory allocated per operation (MB/op), and the number of
memory allocations per operation (allocs/op).
| Test Name | Avg Time (ns/op) | Time Comparison | Memory Allocation
(MB/op) | Memory Comparison | Allocation Count (allocs/op) | Allocation
Comparison |
|---------------------------------|------------------|-----------------|---------------------------|-------------------|------------------------------|------------------------|
| one_string_format_reader | 2,781,990,000 | Baseline | 2,422 | Baseline
| 20,336,539 | Baseline |
| pk_ts_separate_format_reader | 480,682,639 | -82.72% | 1,765 | -27.14%
| 20,396,958 | +0.30% |
| one_string_format_writer | 5,483,436,041 | Baseline | 13,900 |
Baseline | 70,057,473 | Baseline |
| pk_and_ts_separate_format_writer| 798,591,584 | -85.43% | 2,178 |
-84.34% | 30,270,488 | -56.78% |
Both read and write operations show significant improvements in both
speed and memory allocation.
Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
2024-07-06 01:08:09 +00:00
|
|
|
func NewBaseDescriptorEvent(collectionID int64, partitionID int64, segmentID int64) *descriptorEvent {
|
|
|
|
de := newDescriptorEvent()
|
|
|
|
de.CollectionID = collectionID
|
|
|
|
de.PartitionID = partitionID
|
|
|
|
de.SegmentID = segmentID
|
|
|
|
de.StartTimestamp = 0
|
|
|
|
de.EndTimestamp = 0
|
|
|
|
return de
|
|
|
|
}
|
|
|
|
|
2024-07-17 09:47:44 +00:00
|
|
|
func newInsertEventWriter(dataType schemapb.DataType, opts ...PayloadWriterOptions) (*insertEventWriter, error) {
|
|
|
|
payloadWriter, err := NewPayloadWriter(dataType, opts...)
|
2020-12-09 01:55:56 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-04-19 02:36:19 +00:00
|
|
|
header := newEventHeader(InsertEventType)
|
|
|
|
data := newInsertEventData()
|
2020-12-09 03:18:13 +00:00
|
|
|
|
2020-12-09 01:55:56 +00:00
|
|
|
writer := &insertEventWriter{
|
|
|
|
baseEventWriter: baseEventWriter{
|
2020-12-09 03:18:13 +00:00
|
|
|
eventHeader: *header,
|
2020-12-09 01:55:56 +00:00
|
|
|
PayloadWriterInterface: payloadWriter,
|
|
|
|
isClosed: false,
|
|
|
|
isFinish: false,
|
|
|
|
},
|
2020-12-09 03:18:13 +00:00
|
|
|
insertEventData: *data,
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
2020-12-09 03:18:13 +00:00
|
|
|
writer.baseEventWriter.getEventDataSize = writer.insertEventData.GetEventDataFixPartSize
|
2020-12-09 01:55:56 +00:00
|
|
|
writer.baseEventWriter.writeEventData = writer.insertEventData.WriteEventData
|
|
|
|
return writer, nil
|
|
|
|
}
|
|
|
|
|
2024-07-17 09:47:44 +00:00
|
|
|
func newDeleteEventWriter(dataType schemapb.DataType, opts ...PayloadWriterOptions) (*deleteEventWriter, error) {
|
|
|
|
payloadWriter, err := NewPayloadWriter(dataType, opts...)
|
2020-12-09 01:55:56 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-04-19 02:36:19 +00:00
|
|
|
header := newEventHeader(DeleteEventType)
|
|
|
|
data := newDeleteEventData()
|
2021-07-07 11:10:07 +00:00
|
|
|
|
2020-12-09 01:55:56 +00:00
|
|
|
writer := &deleteEventWriter{
|
|
|
|
baseEventWriter: baseEventWriter{
|
2020-12-09 03:18:13 +00:00
|
|
|
eventHeader: *header,
|
2020-12-09 01:55:56 +00:00
|
|
|
PayloadWriterInterface: payloadWriter,
|
|
|
|
isClosed: false,
|
|
|
|
isFinish: false,
|
|
|
|
},
|
2020-12-09 03:18:13 +00:00
|
|
|
deleteEventData: *data,
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
2020-12-09 03:18:13 +00:00
|
|
|
writer.baseEventWriter.getEventDataSize = writer.deleteEventData.GetEventDataFixPartSize
|
2020-12-09 01:55:56 +00:00
|
|
|
writer.baseEventWriter.writeEventData = writer.deleteEventData.WriteEventData
|
|
|
|
return writer, nil
|
|
|
|
}
|
2021-07-07 11:10:07 +00:00
|
|
|
|
2020-12-10 07:50:09 +00:00
|
|
|
func newCreateCollectionEventWriter(dataType schemapb.DataType) (*createCollectionEventWriter, error) {
|
2021-03-12 06:22:09 +00:00
|
|
|
if dataType != schemapb.DataType_String && dataType != schemapb.DataType_Int64 {
|
2020-12-10 06:52:42 +00:00
|
|
|
return nil, errors.New("incorrect data type")
|
|
|
|
}
|
|
|
|
|
2024-07-17 09:47:44 +00:00
|
|
|
payloadWriter, err := NewPayloadWriter(dataType)
|
2020-12-09 01:55:56 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-04-19 02:36:19 +00:00
|
|
|
header := newEventHeader(CreateCollectionEventType)
|
|
|
|
data := newCreateCollectionEventData()
|
2020-12-09 03:18:13 +00:00
|
|
|
|
2020-12-09 01:55:56 +00:00
|
|
|
writer := &createCollectionEventWriter{
|
|
|
|
baseEventWriter: baseEventWriter{
|
2020-12-09 03:18:13 +00:00
|
|
|
eventHeader: *header,
|
2020-12-09 01:55:56 +00:00
|
|
|
PayloadWriterInterface: payloadWriter,
|
|
|
|
isClosed: false,
|
|
|
|
isFinish: false,
|
|
|
|
},
|
2020-12-09 03:18:13 +00:00
|
|
|
createCollectionEventData: *data,
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
2020-12-09 03:18:13 +00:00
|
|
|
writer.baseEventWriter.getEventDataSize = writer.createCollectionEventData.GetEventDataFixPartSize
|
2020-12-09 01:55:56 +00:00
|
|
|
writer.baseEventWriter.writeEventData = writer.createCollectionEventData.WriteEventData
|
|
|
|
return writer, nil
|
|
|
|
}
|
2021-07-07 11:10:07 +00:00
|
|
|
|
2020-12-10 07:50:09 +00:00
|
|
|
func newDropCollectionEventWriter(dataType schemapb.DataType) (*dropCollectionEventWriter, error) {
|
2021-03-12 06:22:09 +00:00
|
|
|
if dataType != schemapb.DataType_String && dataType != schemapb.DataType_Int64 {
|
2020-12-10 06:52:42 +00:00
|
|
|
return nil, errors.New("incorrect data type")
|
|
|
|
}
|
|
|
|
|
2024-07-17 09:47:44 +00:00
|
|
|
payloadWriter, err := NewPayloadWriter(dataType)
|
2020-12-09 01:55:56 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-04-19 02:36:19 +00:00
|
|
|
header := newEventHeader(DropCollectionEventType)
|
|
|
|
data := newDropCollectionEventData()
|
2021-07-07 11:10:07 +00:00
|
|
|
|
2020-12-09 01:55:56 +00:00
|
|
|
writer := &dropCollectionEventWriter{
|
|
|
|
baseEventWriter: baseEventWriter{
|
2020-12-09 03:18:13 +00:00
|
|
|
eventHeader: *header,
|
2020-12-09 01:55:56 +00:00
|
|
|
PayloadWriterInterface: payloadWriter,
|
|
|
|
isClosed: false,
|
|
|
|
isFinish: false,
|
|
|
|
},
|
2020-12-09 03:18:13 +00:00
|
|
|
dropCollectionEventData: *data,
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
2020-12-09 03:18:13 +00:00
|
|
|
writer.baseEventWriter.getEventDataSize = writer.dropCollectionEventData.GetEventDataFixPartSize
|
2020-12-09 01:55:56 +00:00
|
|
|
writer.baseEventWriter.writeEventData = writer.dropCollectionEventData.WriteEventData
|
|
|
|
return writer, nil
|
|
|
|
}
|
2021-07-07 11:10:07 +00:00
|
|
|
|
2020-12-10 07:50:09 +00:00
|
|
|
func newCreatePartitionEventWriter(dataType schemapb.DataType) (*createPartitionEventWriter, error) {
|
2021-03-12 06:22:09 +00:00
|
|
|
if dataType != schemapb.DataType_String && dataType != schemapb.DataType_Int64 {
|
2020-12-10 06:52:42 +00:00
|
|
|
return nil, errors.New("incorrect data type")
|
|
|
|
}
|
|
|
|
|
2024-07-17 09:47:44 +00:00
|
|
|
payloadWriter, err := NewPayloadWriter(dataType)
|
2020-12-09 01:55:56 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-04-19 02:36:19 +00:00
|
|
|
header := newEventHeader(CreatePartitionEventType)
|
|
|
|
data := newCreatePartitionEventData()
|
2020-12-09 03:18:13 +00:00
|
|
|
|
2020-12-09 01:55:56 +00:00
|
|
|
writer := &createPartitionEventWriter{
|
|
|
|
baseEventWriter: baseEventWriter{
|
2020-12-09 03:18:13 +00:00
|
|
|
eventHeader: *header,
|
2020-12-09 01:55:56 +00:00
|
|
|
PayloadWriterInterface: payloadWriter,
|
|
|
|
isClosed: false,
|
|
|
|
isFinish: false,
|
|
|
|
},
|
2020-12-09 03:18:13 +00:00
|
|
|
createPartitionEventData: *data,
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
2020-12-09 03:18:13 +00:00
|
|
|
writer.baseEventWriter.getEventDataSize = writer.createPartitionEventData.GetEventDataFixPartSize
|
2020-12-09 01:55:56 +00:00
|
|
|
writer.baseEventWriter.writeEventData = writer.createPartitionEventData.WriteEventData
|
|
|
|
return writer, nil
|
|
|
|
}
|
2021-07-07 11:10:07 +00:00
|
|
|
|
2020-12-10 07:50:09 +00:00
|
|
|
func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEventWriter, error) {
|
2021-03-12 06:22:09 +00:00
|
|
|
if dataType != schemapb.DataType_String && dataType != schemapb.DataType_Int64 {
|
2020-12-10 06:52:42 +00:00
|
|
|
return nil, errors.New("incorrect data type")
|
|
|
|
}
|
|
|
|
|
2024-07-17 09:47:44 +00:00
|
|
|
payloadWriter, err := NewPayloadWriter(dataType)
|
2020-12-09 01:55:56 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-04-19 02:36:19 +00:00
|
|
|
header := newEventHeader(DropPartitionEventType)
|
|
|
|
data := newDropPartitionEventData()
|
2021-07-07 11:10:07 +00:00
|
|
|
|
2020-12-09 01:55:56 +00:00
|
|
|
writer := &dropPartitionEventWriter{
|
|
|
|
baseEventWriter: baseEventWriter{
|
2020-12-09 03:18:13 +00:00
|
|
|
eventHeader: *header,
|
2020-12-09 01:55:56 +00:00
|
|
|
PayloadWriterInterface: payloadWriter,
|
|
|
|
isClosed: false,
|
|
|
|
isFinish: false,
|
|
|
|
},
|
2020-12-09 03:18:13 +00:00
|
|
|
dropPartitionEventData: *data,
|
2020-12-09 01:55:56 +00:00
|
|
|
}
|
2020-12-09 03:18:13 +00:00
|
|
|
writer.baseEventWriter.getEventDataSize = writer.dropPartitionEventData.GetEventDataFixPartSize
|
2020-12-09 01:55:56 +00:00
|
|
|
writer.baseEventWriter.writeEventData = writer.dropPartitionEventData.WriteEventData
|
|
|
|
return writer, nil
|
|
|
|
}
|
2021-09-29 01:52:12 +00:00
|
|
|
|
2022-11-18 02:47:08 +00:00
|
|
|
func newIndexFileEventWriter(dataType schemapb.DataType) (*indexFileEventWriter, error) {
|
2024-07-17 09:47:44 +00:00
|
|
|
payloadWriter, err := NewPayloadWriter(dataType)
|
2021-09-29 01:52:12 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
header := newEventHeader(IndexFileEventType)
|
|
|
|
data := newIndexFileEventData()
|
|
|
|
|
|
|
|
writer := &indexFileEventWriter{
|
|
|
|
baseEventWriter: baseEventWriter{
|
|
|
|
eventHeader: *header,
|
|
|
|
PayloadWriterInterface: payloadWriter,
|
|
|
|
isClosed: false,
|
|
|
|
isFinish: false,
|
|
|
|
},
|
|
|
|
indexFileEventData: *data,
|
|
|
|
}
|
|
|
|
writer.baseEventWriter.getEventDataSize = writer.indexFileEventData.GetEventDataFixPartSize
|
|
|
|
writer.baseEventWriter.writeEventData = writer.indexFileEventData.WriteEventData
|
|
|
|
|
|
|
|
return writer, nil
|
|
|
|
}
|