From a5ad70a5ab1a4da5d39b5fc9e158b7d94820a58c Mon Sep 17 00:00:00 2001 From: godchen Date: Mon, 19 Apr 2021 10:36:19 +0800 Subject: [PATCH] Add unittest for storage Signed-off-by: godchen --- internal/storage/binlog_test.go | 254 ++++++++++- internal/storage/binlog_writer.go | 27 +- internal/storage/binlog_writer_test.go | 3 +- internal/storage/cwrapper/ParquetWrapper.cpp | 3 +- internal/storage/data_codec.go | 23 +- internal/storage/data_codec_test.go | 83 +++- internal/storage/event_data.go | 39 +- internal/storage/event_header.go | 8 +- internal/storage/event_test.go | 177 ++++++- internal/storage/event_writer.go | 74 +-- internal/storage/payload.go | 27 +- internal/storage/payload_test.go | 457 +++++++++++++++++++ internal/storage/print_binglog_test.go | 4 +- internal/storage/unsafe_test.go | 33 ++ 14 files changed, 1058 insertions(+), 154 deletions(-) create mode 100644 internal/storage/unsafe_test.go diff --git a/internal/storage/binlog_test.go b/internal/storage/binlog_test.go index 49f957a369..b6fef656b0 100644 --- a/internal/storage/binlog_test.go +++ b/internal/storage/binlog_test.go @@ -1,7 +1,9 @@ package storage import ( + "bytes" "encoding/binary" + "fmt" "testing" "time" "unsafe" @@ -12,8 +14,7 @@ import ( ) func TestInsertBinlog(t *testing.T) { - w, err := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40) - assert.Nil(t, err) + w := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40) e1, err := w.NextInsertEventWriter() assert.Nil(t, err) @@ -272,8 +273,7 @@ func TestInsertBinlog(t *testing.T) { } func TestDeleteBinlog(t *testing.T) { - w, err := NewDeleteBinlogWriter(schemapb.DataType_Int64, 50) - assert.Nil(t, err) + w := NewDeleteBinlogWriter(schemapb.DataType_Int64, 50) e1, err := w.NextDeleteEventWriter() assert.Nil(t, err) @@ -532,8 +532,7 @@ func TestDeleteBinlog(t *testing.T) { } func TestDDLBinlog1(t *testing.T) { - w, err := NewDDLBinlogWriter(schemapb.DataType_Int64, 50) - assert.Nil(t, err) + w := NewDDLBinlogWriter(schemapb.DataType_Int64, 50) e1, err := w.NextCreateCollectionEventWriter() assert.Nil(t, err) @@ -792,8 +791,7 @@ func TestDDLBinlog1(t *testing.T) { } func TestDDLBinlog2(t *testing.T) { - w, err := NewDDLBinlogWriter(schemapb.DataType_Int64, 50) - assert.Nil(t, err) + w := NewDDLBinlogWriter(schemapb.DataType_Int64, 50) e1, err := w.NextCreatePartitionEventWriter() assert.Nil(t, err) @@ -1050,3 +1048,243 @@ func TestDDLBinlog2(t *testing.T) { assert.Equal(t, ed2.StartTimestamp, Timestamp(300)) assert.Equal(t, ed2.EndTimestamp, Timestamp(400)) } + +func TestNewBinlogReaderError(t *testing.T) { + data := []byte{} + reader, err := NewBinlogReader(data) + assert.Nil(t, reader) + assert.NotNil(t, err) + + data = []byte{0, 0, 0, 0} + reader, err = NewBinlogReader(data) + assert.Nil(t, reader) + assert.NotNil(t, err) + + buffer := new(bytes.Buffer) + err = binary.Write(buffer, binary.LittleEndian, int32(MagicNumber)) + assert.Nil(t, err) + data = buffer.Bytes() + + reader, err = NewBinlogReader(data) + assert.Nil(t, reader) + assert.NotNil(t, err) + + err = binary.Write(buffer, binary.LittleEndian, int32(555)) + assert.Nil(t, err) + data = buffer.Bytes() + + reader, err = NewBinlogReader(data) + assert.Nil(t, reader) + assert.NotNil(t, err) + + w := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40) + + w.SetStartTimeStamp(1000) + w.SetEndTimeStamp(2000) + + e1, err := w.NextInsertEventWriter() + assert.Nil(t, err) + err = e1.AddDataToPayload([]int64{1, 2, 3}) + assert.Nil(t, err) + err = e1.AddDataToPayload([]int32{4, 5, 6}) + assert.NotNil(t, err) + err = e1.AddDataToPayload([]int64{4, 5, 6}) + assert.Nil(t, err) + e1.SetStartTimestamp(100) + e1.SetEndTimestamp(200) + + _, err = w.GetBuffer() + assert.NotNil(t, err) + err = w.Close() + assert.Nil(t, err) + + buf, err := w.GetBuffer() + assert.Nil(t, err) + + reader, err = NewBinlogReader(buf) + assert.Nil(t, err) + reader.Close() + + event1, err := reader.NextEventReader() + assert.Nil(t, event1) + assert.NotNil(t, err) + + err = reader.Close() + assert.Nil(t, err) +} + +func TestNewBinlogWriterTsError(t *testing.T) { + w := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40) + + _, err := w.GetBuffer() + assert.NotNil(t, err) + err = w.Close() + assert.NotNil(t, err) + + w.SetStartTimeStamp(1000) + _, err = w.GetBuffer() + assert.NotNil(t, err) + err = w.Close() + assert.NotNil(t, err) + + w.SetEndTimeStamp(2000) + _, err = w.GetBuffer() + assert.NotNil(t, err) + err = w.Close() + assert.Nil(t, err) + + _, err = w.GetBuffer() + assert.Nil(t, err) + +} + +func TestInsertBinlogWriterCloseError(t *testing.T) { + insertWriter := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40) + e1, err := insertWriter.NextInsertEventWriter() + assert.Nil(t, err) + err = e1.AddDataToPayload([]int64{1, 2, 3}) + assert.Nil(t, err) + e1.SetStartTimestamp(100) + e1.SetEndTimestamp(200) + insertWriter.SetStartTimeStamp(1000) + insertWriter.SetEndTimeStamp(2000) + err = insertWriter.Close() + assert.Nil(t, err) + assert.NotNil(t, insertWriter.buffer) + insertEventWriter, err := insertWriter.NextInsertEventWriter() + assert.Nil(t, insertEventWriter) + assert.NotNil(t, err) + +} + +func TestDeleteBinlogWriteCloseError(t *testing.T) { + deleteWriter := NewDeleteBinlogWriter(schemapb.DataType_Int64, 10) + e1, err := deleteWriter.NextDeleteEventWriter() + assert.Nil(t, err) + err = e1.AddDataToPayload([]int64{1, 2, 3}) + assert.Nil(t, err) + e1.SetStartTimestamp(100) + e1.SetEndTimestamp(200) + deleteWriter.SetStartTimeStamp(1000) + deleteWriter.SetEndTimeStamp(2000) + err = deleteWriter.Close() + assert.Nil(t, err) + assert.NotNil(t, deleteWriter.buffer) + deleteEventWriter, err := deleteWriter.NextDeleteEventWriter() + assert.Nil(t, deleteEventWriter) + assert.NotNil(t, err) +} + +func TestDDBinlogWriteCloseError(t *testing.T) { + ddBinlogWriter := NewDDLBinlogWriter(schemapb.DataType_Int64, 10) + e1, err := ddBinlogWriter.NextCreateCollectionEventWriter() + assert.Nil(t, err) + err = e1.AddDataToPayload([]int64{1, 2, 3}) + assert.Nil(t, err) + e1.SetStartTimestamp(100) + e1.SetEndTimestamp(200) + + ddBinlogWriter.SetStartTimeStamp(1000) + ddBinlogWriter.SetEndTimeStamp(2000) + err = ddBinlogWriter.Close() + assert.Nil(t, err) + assert.NotNil(t, ddBinlogWriter.buffer) + + createCollectionEventWriter, err := ddBinlogWriter.NextCreateCollectionEventWriter() + assert.Nil(t, createCollectionEventWriter) + assert.NotNil(t, err) + + dropCollectionEventWriter, err := ddBinlogWriter.NextDropCollectionEventWriter() + assert.Nil(t, dropCollectionEventWriter) + assert.NotNil(t, err) + + createPartitionEventWriter, err := ddBinlogWriter.NextCreatePartitionEventWriter() + assert.Nil(t, createPartitionEventWriter) + assert.NotNil(t, err) + + dropPartitionEventWriter, err := ddBinlogWriter.NextDropPartitionEventWriter() + assert.Nil(t, dropPartitionEventWriter) + assert.NotNil(t, err) + +} + +type testEvent struct { + PayloadWriterInterface + finishError bool + writeError bool + getMemoryError bool + getPayloadLengthError bool + releasePayloadError bool +} + +func (e *testEvent) Finish() error { + if e.finishError { + return fmt.Errorf("finish error") + } + return nil +} + +func (e *testEvent) Close() error { + return nil +} + +func (e *testEvent) Write(buffer *bytes.Buffer) error { + if e.writeError { + return fmt.Errorf("write error") + } + return nil +} + +func (e *testEvent) GetMemoryUsageInBytes() (int32, error) { + if e.getMemoryError { + return -1, fmt.Errorf("getMemory error") + } + return 0, nil +} +func (e *testEvent) GetPayloadLengthFromWriter() (int, error) { + if e.getPayloadLengthError { + return -1, fmt.Errorf("getPayloadLength error") + } + return 0, nil +} + +func (e *testEvent) ReleasePayloadWriter() error { + if e.releasePayloadError { + return fmt.Errorf("releasePayload error") + } + return nil +} + +func (e *testEvent) SetOffset(offset int32) { + +} + +var _ EventWriter = (*testEvent)(nil) + +func TestWriterListError(t *testing.T) { + insertWriter := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40) + errorEvent := &testEvent{} + insertWriter.eventWriters = append(insertWriter.eventWriters, errorEvent) + insertWriter.SetStartTimeStamp(1000) + insertWriter.SetEndTimeStamp(2000) + errorEvent.releasePayloadError = true + err := insertWriter.Close() + assert.NotNil(t, err) + insertWriter.buffer = nil + errorEvent.getPayloadLengthError = true + err = insertWriter.Close() + assert.NotNil(t, err) + insertWriter.buffer = nil + errorEvent.getMemoryError = true + err = insertWriter.Close() + assert.NotNil(t, err) + insertWriter.buffer = nil + errorEvent.writeError = true + err = insertWriter.Close() + assert.NotNil(t, err) + insertWriter.buffer = nil + errorEvent.finishError = true + err = insertWriter.Close() + assert.NotNil(t, err) + +} diff --git a/internal/storage/binlog_writer.go b/internal/storage/binlog_writer.go index 97f38a69fe..13c44fd72a 100644 --- a/internal/storage/binlog_writer.go +++ b/internal/storage/binlog_writer.go @@ -204,11 +204,8 @@ func (writer *DDLBinlogWriter) NextDropPartitionEventWriter() (*dropPartitionEve return event, nil } -func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID, FieldID int64) (*InsertBinlogWriter, error) { - descriptorEvent, err := newDescriptorEvent() - if err != nil { - return nil, err - } +func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID, FieldID int64) *InsertBinlogWriter { + descriptorEvent := newDescriptorEvent() descriptorEvent.PayloadDataType = dataType descriptorEvent.CollectionID = collectionID descriptorEvent.PartitionID = partitionID @@ -222,13 +219,10 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID eventWriters: make([]EventWriter, 0), buffer: nil, }, - }, nil -} -func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID int64) (*DeleteBinlogWriter, error) { - descriptorEvent, err := newDescriptorEvent() - if err != nil { - return nil, err } +} +func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID int64) *DeleteBinlogWriter { + descriptorEvent := newDescriptorEvent() descriptorEvent.PayloadDataType = dataType descriptorEvent.CollectionID = collectionID return &DeleteBinlogWriter{ @@ -239,13 +233,10 @@ func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID int64) (*Del eventWriters: make([]EventWriter, 0), buffer: nil, }, - }, nil -} -func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) (*DDLBinlogWriter, error) { - descriptorEvent, err := newDescriptorEvent() - if err != nil { - return nil, err } +} +func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) *DDLBinlogWriter { + descriptorEvent := newDescriptorEvent() descriptorEvent.PayloadDataType = dataType descriptorEvent.CollectionID = collectionID return &DDLBinlogWriter{ @@ -256,5 +247,5 @@ func NewDDLBinlogWriter(dataType schemapb.DataType, collectionID int64) (*DDLBin eventWriters: make([]EventWriter, 0), buffer: nil, }, - }, nil + } } diff --git a/internal/storage/binlog_writer_test.go b/internal/storage/binlog_writer_test.go index 613e41e831..c5beafa316 100644 --- a/internal/storage/binlog_writer_test.go +++ b/internal/storage/binlog_writer_test.go @@ -10,11 +10,10 @@ import ( ) func TestBinlogWriterReader(t *testing.T) { - binlogWriter, err := NewInsertBinlogWriter(schemapb.DataType_Int32, 10, 20, 30, 40) + binlogWriter := NewInsertBinlogWriter(schemapb.DataType_Int32, 10, 20, 30, 40) binlogWriter.SetStartTimeStamp(1000) binlogWriter.SetEndTimeStamp(2000) defer binlogWriter.Close() - assert.Nil(t, err) eventWriter, err := binlogWriter.NextInsertEventWriter() assert.Nil(t, err) err = eventWriter.AddInt32ToPayload([]int32{1, 2, 3}) diff --git a/internal/storage/cwrapper/ParquetWrapper.cpp b/internal/storage/cwrapper/ParquetWrapper.cpp index c586597286..f2721b297a 100644 --- a/internal/storage/cwrapper/ParquetWrapper.cpp +++ b/internal/storage/cwrapper/ParquetWrapper.cpp @@ -296,6 +296,7 @@ CBuffer GetPayloadBufferFromWriter(CPayloadWriter payloadWriter) { if (p->output == nullptr) { buf.length = 0; buf.data = nullptr; + return buf; } auto &output = p->output->Buffer(); buf.length = static_cast(output.size()); @@ -489,4 +490,4 @@ extern "C" CStatus ReleasePayloadReader(CPayloadReader payloadReader) { delete[] p->bValues; delete p; return st; -} \ No newline at end of file +} diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index c10357e0ea..32decf3b63 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -50,10 +50,6 @@ func (s BlobList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func NewBlob(key string, value []byte) *Blob { - return &Blob{key, value} -} - func (b Blob) GetKey() string { return b.Key } @@ -134,7 +130,6 @@ func NewInsertCodec(schema *etcdpb.CollectionMeta) *InsertCodec { func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, error) { var blobs []*Blob var writer *InsertBinlogWriter - var err error timeFieldData, ok := data.Data[ms.TimeStampField] if !ok { return nil, errors.New("data doesn't contains timestamp field") @@ -143,10 +138,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique for _, field := range insertCodec.Schema.Schema.Fields { singleData := data.Data[field.FieldID] - writer, err = NewInsertBinlogWriter(field.DataType, insertCodec.Schema.ID, partitionID, segmentID, field.FieldID) - if err != nil { - return nil, err - } + writer = NewInsertBinlogWriter(field.DataType, insertCodec.Schema.ID, partitionID, segmentID, field.FieldID) eventWriter, err := writer.NextInsertEventWriter() if err != nil { return nil, err @@ -185,9 +177,6 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique if err != nil { return nil, err } - if writer == nil { - return nil, errors.New("binlog writer is nil") - } writer.SetStartTimeStamp(typeutil.Timestamp(ts[0])) writer.SetEndTimeStamp(typeutil.Timestamp(ts[len(ts)-1])) @@ -442,10 +431,7 @@ func NewDataDefinitionCodec(collectionID int64) *DataDefinitionCodec { } func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) { - writer, err := NewDDLBinlogWriter(schemapb.DataType_Int64, dataDefinitionCodec.collectionID) - if err != nil { - return nil, err - } + writer := NewDDLBinlogWriter(schemapb.DataType_Int64, dataDefinitionCodec.collectionID) var blobs []*Blob @@ -478,10 +464,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ Value: buffer, }) - writer, err = NewDDLBinlogWriter(schemapb.DataType_String, dataDefinitionCodec.collectionID) - if err != nil { - return nil, err - } + writer = NewDDLBinlogWriter(schemapb.DataType_String, dataDefinitionCodec.collectionID) for pos, req := range ddRequests { switch eventTypes[pos] { diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index cdba1f8624..0ada4d60f6 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -10,7 +10,7 @@ import ( ) func TestInsertCodec(t *testing.T) { - Schema := &etcdpb.CollectionMeta{ + schema := &etcdpb.CollectionMeta{ ID: 1, CreateTime: 1, SegmentIDs: []int64{0, 1}, @@ -107,7 +107,7 @@ func TestInsertCodec(t *testing.T) { }, }, } - insertCodec := NewInsertCodec(Schema) + insertCodec := NewInsertCodec(schema) insertDataFirst := &InsertData{ Data: map[int64]FieldData{ 0: &Int64FieldData{ @@ -221,11 +221,13 @@ func TestInsertCodec(t *testing.T) { assert.Nil(t, err) for _, blob := range firstBlobs { blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 100) + assert.Equal(t, blob.GetKey(), blob.Key) } secondBlobs, err := insertCodec.Serialize(1, 1, insertDataSecond) assert.Nil(t, err) for _, blob := range secondBlobs { blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 99) + assert.Equal(t, blob.GetKey(), blob.Key) } resultBlobs := append(firstBlobs, secondBlobs...) partitionID, segmentID, resultData, err := insertCodec.Deserialize(resultBlobs) @@ -258,6 +260,10 @@ func TestInsertCodec(t *testing.T) { assert.Equal(t, []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7}, resultData.Data[109].(*FloatVectorFieldData).Data) assert.Nil(t, insertCodec.Close()) + + blobs := []*Blob{} + _, _, _, err = insertCodec.Deserialize(blobs) + assert.NotNil(t, err) } func TestDDCodec(t *testing.T) { dataDefinitionCodec := NewDataDefinitionCodec(int64(1)) @@ -289,6 +295,10 @@ func TestDDCodec(t *testing.T) { assert.Equal(t, resultTs, ts) assert.Equal(t, resultRequests, ddRequests) assert.Nil(t, dataDefinitionCodec.Close()) + + blobs = []*Blob{} + _, _, err = dataDefinitionCodec.Deserialize(blobs) + assert.NotNil(t, err) } func TestIndexCodec(t *testing.T) { @@ -323,4 +333,73 @@ func TestIndexCodec(t *testing.T) { assert.EqualValues(t, indexParams, indexParamsOutput) assert.EqualValues(t, "index_test_name", indexName) assert.EqualValues(t, 1234, indexID) + + blobs = []*Blob{} + _, _, _, _, err = indexCodec.Deserialize(blobs) + assert.NotNil(t, err) +} + +func TestTsError(t *testing.T) { + insertData := &InsertData{} + insertCodec := NewInsertCodec(nil) + blobs, err := insertCodec.Serialize(1, 1, insertData) + assert.Nil(t, blobs) + assert.NotNil(t, err) +} + +func TestSchemaError(t *testing.T) { + schema := &etcdpb.CollectionMeta{ + ID: 1, + CreateTime: 1, + SegmentIDs: []int64{0, 1}, + PartitionTags: []string{"partition_0", "partition_1"}, + Schema: &schemapb.CollectionSchema{ + Name: "schema", + Description: "schema", + AutoID: true, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 0, + Name: "row_id", + IsPrimaryKey: false, + Description: "row_id", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 1, + Name: "Ts", + IsPrimaryKey: false, + Description: "Ts", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, + Name: "field_bool", + IsPrimaryKey: false, + Description: "description_2", + DataType: 999, + }, + }, + }, + } + insertData := &InsertData{ + Data: map[int64]FieldData{ + 0: &Int64FieldData{ + NumRows: 2, + Data: []int64{3, 4}, + }, + 1: &Int64FieldData{ + NumRows: 2, + Data: []int64{3, 4}, + }, + 100: &BoolFieldData{ + NumRows: 2, + Data: []bool{true, false}, + }, + }, + } + insertCodec := NewInsertCodec(schema) + blobs, err := insertCodec.Serialize(1, 1, insertData) + assert.Nil(t, blobs) + assert.NotNil(t, err) } diff --git a/internal/storage/event_data.go b/internal/storage/event_data.go index 1c2fc98fb7..5eb7873342 100644 --- a/internal/storage/event_data.go +++ b/internal/storage/event_data.go @@ -2,7 +2,6 @@ package storage import ( "encoding/binary" - "fmt" "io" "errors" @@ -54,10 +53,7 @@ func (data *descriptorEventData) Write(buffer io.Writer) error { } func readDescriptorEventData(buffer io.Reader) (*descriptorEventData, error) { - event, err := newDescriptorEventData() - if err != nil { - return nil, err - } + event := newDescriptorEventData() if err := binary.Read(buffer, binary.LittleEndian, &event.DescriptorEventDataFixPart); err != nil { return nil, err @@ -252,7 +248,7 @@ func getEventFixPartSize(code EventTypeCode) int32 { case DropCollectionEventType: return (&dropCollectionEventData{}).GetEventDataFixPartSize() case CreatePartitionEventType: - return (&createCollectionEventData{}).GetEventDataFixPartSize() + return (&createPartitionEventData{}).GetEventDataFixPartSize() case DropPartitionEventType: return (&dropPartitionEventData{}).GetEventDataFixPartSize() default: @@ -260,7 +256,7 @@ func getEventFixPartSize(code EventTypeCode) int32 { } } -func newDescriptorEventData() (*descriptorEventData, error) { +func newDescriptorEventData() *descriptorEventData { data := descriptorEventData{ DescriptorEventDataFixPart: DescriptorEventDataFixPart{ BinlogVersion: BinlogVersion, @@ -278,49 +274,46 @@ func newDescriptorEventData() (*descriptorEventData, error) { } for i := DescriptorEventType; i < EventTypeEnd; i++ { size := getEventFixPartSize(i) - if size == -1 { - return nil, fmt.Errorf("undefined event type %d", i) - } data.PostHeaderLengths = append(data.PostHeaderLengths, uint8(size)) } - return &data, nil + return &data } -func newInsertEventData() (*insertEventData, error) { +func newInsertEventData() *insertEventData { return &insertEventData{ StartTimestamp: 0, EndTimestamp: 0, - }, nil + } } -func newDeleteEventData() (*deleteEventData, error) { +func newDeleteEventData() *deleteEventData { return &deleteEventData{ StartTimestamp: 0, EndTimestamp: 0, - }, nil + } } -func newCreateCollectionEventData() (*createCollectionEventData, error) { +func newCreateCollectionEventData() *createCollectionEventData { return &createCollectionEventData{ StartTimestamp: 0, EndTimestamp: 0, - }, nil + } } -func newDropCollectionEventData() (*dropCollectionEventData, error) { +func newDropCollectionEventData() *dropCollectionEventData { return &dropCollectionEventData{ StartTimestamp: 0, EndTimestamp: 0, - }, nil + } } -func newCreatePartitionEventData() (*createPartitionEventData, error) { +func newCreatePartitionEventData() *createPartitionEventData { return &createPartitionEventData{ StartTimestamp: 0, EndTimestamp: 0, - }, nil + } } -func newDropPartitionEventData() (*dropPartitionEventData, error) { +func newDropPartitionEventData() *dropPartitionEventData { return &dropPartitionEventData{ StartTimestamp: 0, EndTimestamp: 0, - }, nil + } } func readInsertEventDataFixPart(buffer io.Reader) (*insertEventData, error) { diff --git a/internal/storage/event_header.go b/internal/storage/event_header.go index b37194dfe4..8d45ba05b6 100644 --- a/internal/storage/event_header.go +++ b/internal/storage/event_header.go @@ -48,16 +48,16 @@ func readDescriptorEventHeader(buffer io.Reader) (*descriptorEventHeader, error) return header, nil } -func newDescriptorEventHeader() (*descriptorEventHeader, error) { +func newDescriptorEventHeader() *descriptorEventHeader { header := descriptorEventHeader{ Timestamp: tsoutil.ComposeTS(time.Now().UnixNano()/int64(time.Millisecond), 0), TypeCode: DescriptorEventType, ServerID: ServerID, } - return &header, nil + return &header } -func newEventHeader(eventTypeCode EventTypeCode) (*eventHeader, error) { +func newEventHeader(eventTypeCode EventTypeCode) *eventHeader { return &eventHeader{ baseEventHeader: baseEventHeader{ Timestamp: tsoutil.ComposeTS(time.Now().UnixNano()/int64(time.Millisecond), 0), @@ -66,5 +66,5 @@ func newEventHeader(eventTypeCode EventTypeCode) (*eventHeader, error) { EventLength: -1, NextPosition: -1, }, - }, nil + } } diff --git a/internal/storage/event_test.go b/internal/storage/event_test.go index 1defcc9c1b..aec7188b0b 100644 --- a/internal/storage/event_test.go +++ b/internal/storage/event_test.go @@ -34,12 +34,11 @@ func checkEventHeader( } func TestDescriptorEvent(t *testing.T) { - desc, err := newDescriptorEvent() - assert.Nil(t, err) + desc := newDescriptorEvent() var buf bytes.Buffer - err = desc.Write(&buf) + err := desc.Write(&buf) assert.Nil(t, err) buffer := buf.Bytes() @@ -1147,3 +1146,175 @@ func TestDropPartitionEvent(t *testing.T) { }) } + +func TestDescriptorEventTsError(t *testing.T) { + insertData := &insertEventData{ + StartTimestamp: 0, + EndTimestamp: 0, + } + buf := new(bytes.Buffer) + err := insertData.WriteEventData(buf) + assert.NotNil(t, err) + insertData.StartTimestamp = 1000 + err = insertData.WriteEventData(buf) + assert.NotNil(t, err) + + deleteData := &deleteEventData{ + StartTimestamp: 0, + EndTimestamp: 0, + } + err = deleteData.WriteEventData(buf) + assert.NotNil(t, err) + deleteData.StartTimestamp = 1000 + err = deleteData.WriteEventData(buf) + assert.NotNil(t, err) + + createCollectionData := &createCollectionEventData{ + StartTimestamp: 0, + EndTimestamp: 0, + } + err = createCollectionData.WriteEventData(buf) + assert.NotNil(t, err) + createCollectionData.StartTimestamp = 1000 + err = createCollectionData.WriteEventData(buf) + assert.NotNil(t, err) + + dropCollectionData := &dropCollectionEventData{ + StartTimestamp: 0, + EndTimestamp: 0, + } + err = dropCollectionData.WriteEventData(buf) + assert.NotNil(t, err) + dropCollectionData.StartTimestamp = 1000 + err = dropCollectionData.WriteEventData(buf) + assert.NotNil(t, err) + + createPartitionData := &createPartitionEventData{ + StartTimestamp: 0, + EndTimestamp: 0, + } + err = createPartitionData.WriteEventData(buf) + assert.NotNil(t, err) + createPartitionData.StartTimestamp = 1000 + err = createPartitionData.WriteEventData(buf) + assert.NotNil(t, err) + + dropPartitionData := &dropPartitionEventData{ + StartTimestamp: 0, + EndTimestamp: 0, + } + err = dropPartitionData.WriteEventData(buf) + assert.NotNil(t, err) + dropPartitionData.StartTimestamp = 1000 + err = dropPartitionData.WriteEventData(buf) + assert.NotNil(t, err) +} + +func TestReadFixPartError(t *testing.T) { + buf := new(bytes.Buffer) + _, err := readEventHeader(buf) + assert.NotNil(t, err) + + _, err = readInsertEventDataFixPart(buf) + assert.NotNil(t, err) + + _, err = readDeleteEventDataFixPart(buf) + assert.NotNil(t, err) + + _, err = readCreateCollectionEventDataFixPart(buf) + assert.NotNil(t, err) + + _, err = readDropCollectionEventDataFixPart(buf) + assert.NotNil(t, err) + + _, err = readCreatePartitionEventDataFixPart(buf) + assert.NotNil(t, err) + + _, err = readDropPartitionEventDataFixPart(buf) + assert.NotNil(t, err) + + _, err = readDescriptorEventData(buf) + assert.NotNil(t, err) + + event := newDescriptorEventData() + err = binary.Write(buf, binary.LittleEndian, event.DescriptorEventDataFixPart) + assert.Nil(t, err) + _, err = readDescriptorEventData(buf) + assert.NotNil(t, err) + + size := getEventFixPartSize(EventTypeCode(10)) + assert.Equal(t, size, int32(-1)) +} + +func TestEventReaderError(t *testing.T) { + buf := new(bytes.Buffer) + r, err := newEventReader(schemapb.DataType_Int64, buf) + assert.Nil(t, r) + assert.NotNil(t, err) + + header := newEventHeader(DescriptorEventType) + err = header.Write(buf) + assert.Nil(t, err) + + r, err = newEventReader(schemapb.DataType_Int64, buf) + assert.Nil(t, r) + assert.NotNil(t, err) + + buf = new(bytes.Buffer) + header = newEventHeader(InsertEventType) + err = header.Write(buf) + assert.Nil(t, err) + + r, err = newEventReader(schemapb.DataType_Int64, buf) + assert.Nil(t, r) + assert.NotNil(t, err) + + buf = new(bytes.Buffer) + header = newEventHeader(InsertEventType) + header.EventLength = getEventFixPartSize(InsertEventType) + int32(binary.Size(header)) + err = header.Write(buf) + assert.Nil(t, err) + + insertData := &insertEventData{ + StartTimestamp: 1000, + EndTimestamp: 2000, + } + err = binary.Write(buf, binary.LittleEndian, insertData) + assert.Nil(t, err) + + r, err = newEventReader(schemapb.DataType_Int64, buf) + assert.Nil(t, r) + assert.NotNil(t, err) + +} + +func TestEventClose(t *testing.T) { + w, err := newInsertEventWriter(schemapb.DataType_String) + assert.Nil(t, err) + w.SetStartTimestamp(tsoutil.ComposeTS(10, 0)) + w.SetEndTimestamp(tsoutil.ComposeTS(100, 0)) + err = w.AddDataToPayload("1234") + assert.Nil(t, err) + err = w.Finish() + assert.Nil(t, err) + + var buf bytes.Buffer + err = w.Write(&buf) + assert.Nil(t, err) + err = w.Close() + assert.Nil(t, err) + + wBuf := buf.Bytes() + r, err := newEventReader(schemapb.DataType_String, bytes.NewBuffer(wBuf)) + assert.Nil(t, err) + + err = r.Close() + assert.Nil(t, err) + err = r.Close() + assert.Nil(t, err) + + _, err = r.readHeader() + assert.NotNil(t, err) + _, err = r.readData() + assert.NotNil(t, err) +} diff --git a/internal/storage/event_writer.go b/internal/storage/event_writer.go index bbf0900b06..48a6ba5b0a 100644 --- a/internal/storage/event_writer.go +++ b/internal/storage/event_writer.go @@ -177,22 +177,16 @@ type dropPartitionEventWriter struct { dropPartitionEventData } -func newDescriptorEvent() (*descriptorEvent, error) { - header, err := newDescriptorEventHeader() - if err != nil { - return nil, err - } - data, err := newDescriptorEventData() - if err != nil { - return nil, err - } +func newDescriptorEvent() *descriptorEvent { + header := newDescriptorEventHeader() + data := newDescriptorEventData() header.EventLength = header.GetMemoryUsageInBytes() + data.GetMemoryUsageInBytes() header.NextPosition = int32(binary.Size(MagicNumber)) + header.EventLength data.HeaderLength = int8(binary.Size(eventHeader{})) return &descriptorEvent{ descriptorEventHeader: *header, descriptorEventData: *data, - }, err + } } func newInsertEventWriter(dataType schemapb.DataType) (*insertEventWriter, error) { @@ -200,14 +194,8 @@ func newInsertEventWriter(dataType schemapb.DataType) (*insertEventWriter, error if err != nil { return nil, err } - header, err := newEventHeader(InsertEventType) - if err != nil { - return nil, err - } - data, err := newInsertEventData() - if err != nil { - return nil, err - } + header := newEventHeader(InsertEventType) + data := newInsertEventData() writer := &insertEventWriter{ baseEventWriter: baseEventWriter{ @@ -228,14 +216,8 @@ func newDeleteEventWriter(dataType schemapb.DataType) (*deleteEventWriter, error if err != nil { return nil, err } - header, err := newEventHeader(DeleteEventType) - if err != nil { - return nil, err - } - data, err := newDeleteEventData() - if err != nil { - return nil, err - } + header := newEventHeader(DeleteEventType) + data := newDeleteEventData() writer := &deleteEventWriter{ baseEventWriter: baseEventWriter{ eventHeader: *header, @@ -258,14 +240,8 @@ func newCreateCollectionEventWriter(dataType schemapb.DataType) (*createCollecti if err != nil { return nil, err } - header, err := newEventHeader(CreateCollectionEventType) - if err != nil { - return nil, err - } - data, err := newCreateCollectionEventData() - if err != nil { - return nil, err - } + header := newEventHeader(CreateCollectionEventType) + data := newCreateCollectionEventData() writer := &createCollectionEventWriter{ baseEventWriter: baseEventWriter{ @@ -289,14 +265,8 @@ func newDropCollectionEventWriter(dataType schemapb.DataType) (*dropCollectionEv if err != nil { return nil, err } - header, err := newEventHeader(DropCollectionEventType) - if err != nil { - return nil, err - } - data, err := newDropCollectionEventData() - if err != nil { - return nil, err - } + header := newEventHeader(DropCollectionEventType) + data := newDropCollectionEventData() writer := &dropCollectionEventWriter{ baseEventWriter: baseEventWriter{ eventHeader: *header, @@ -319,14 +289,8 @@ func newCreatePartitionEventWriter(dataType schemapb.DataType) (*createPartition if err != nil { return nil, err } - header, err := newEventHeader(CreatePartitionEventType) - if err != nil { - return nil, err - } - data, err := newCreatePartitionEventData() - if err != nil { - return nil, err - } + header := newEventHeader(CreatePartitionEventType) + data := newCreatePartitionEventData() writer := &createPartitionEventWriter{ baseEventWriter: baseEventWriter{ @@ -350,14 +314,8 @@ func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEven if err != nil { return nil, err } - header, err := newEventHeader(DropPartitionEventType) - if err != nil { - return nil, err - } - data, err := newDropPartitionEventData() - if err != nil { - return nil, err - } + header := newEventHeader(DropPartitionEventType) + data := newDropPartitionEventData() writer := &dropPartitionEventWriter{ baseEventWriter: baseEventWriter{ eventHeader: *header, diff --git a/internal/storage/payload.go b/internal/storage/payload.go index 18e5949fb8..ed664fce22 100644 --- a/internal/storage/payload.go +++ b/internal/storage/payload.go @@ -52,17 +52,15 @@ type PayloadReaderInterface interface { ReleasePayloadReader() error Close() error } -type ( - PayloadWriter struct { - payloadWriterPtr C.CPayloadWriter - colType schemapb.DataType - } +type PayloadWriter struct { + payloadWriterPtr C.CPayloadWriter + colType schemapb.DataType +} - PayloadReader struct { - payloadReaderPtr C.CPayloadReader - colType schemapb.DataType - } -) +type PayloadReader struct { + payloadReaderPtr C.CPayloadReader + colType schemapb.DataType +} func NewPayloadWriter(colType schemapb.DataType) (*PayloadWriter, error) { w := C.NewPayloadWriter(C.int(colType)) @@ -131,6 +129,8 @@ func (w *PayloadWriter) AddDataToPayload(msgs interface{}, dim ...int) error { return errors.New("incorrect data type") } return w.AddOneStringToPayload(val) + default: + return errors.New("incorrect datatype") } case 1: switch w.colType { @@ -147,13 +147,14 @@ func (w *PayloadWriter) AddDataToPayload(msgs interface{}, dim ...int) error { return errors.New("incorrect data type") } return w.AddFloatVectorToPayload(val, dim[0]) + default: + return errors.New("incorrect datatype") } default: return errors.New("incorrect input numbers") } - return nil } func (w *PayloadWriter) AddBoolToPayload(msgs []bool) error { @@ -433,6 +434,8 @@ func (r *PayloadReader) GetDataFromPayload(idx ...int) (interface{}, int, error) case schemapb.DataType_String: val, err := r.GetOneStringFromPayload(idx[0]) return val, 0, err + default: + return nil, 0, errors.New("Unknown type") } case 0: switch r.colType { @@ -475,8 +478,6 @@ func (r *PayloadReader) GetDataFromPayload(idx ...int) (interface{}, int, error) default: return nil, 0, errors.New("incorrect number of index") } - - return nil, 0, errors.New("unknown error") } func (r *PayloadReader) ReleasePayloadReader() error { diff --git a/internal/storage/payload_test.go b/internal/storage/payload_test.go index 750e0319b4..77914e1cf9 100644 --- a/internal/storage/payload_test.go +++ b/internal/storage/payload_test.go @@ -423,4 +423,461 @@ func TestPayload_ReaderandWriter(t *testing.T) { assert.ElementsMatch(t, []float32{1.0, 2.0, 3.0, 4.0}, floatVecs) defer r.ReleasePayloadReader() }) + + t.Run("TestAddDataToPayload", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Bool) + w.colType = 999 + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddDataToPayload([]bool{false, false, false, false}) + assert.NotNil(t, err) + + err = w.AddDataToPayload([]bool{false, false, false, false}, 0) + assert.NotNil(t, err) + + err = w.AddDataToPayload([]bool{false, false, false, false}, 0, 0) + assert.NotNil(t, err) + + err = w.AddBoolToPayload([]bool{}) + assert.NotNil(t, err) + err = w.FinishPayloadWriter() + assert.Nil(t, err) + err = w.AddBoolToPayload([]bool{false}) + assert.NotNil(t, err) + }) + + t.Run("TestAddBoolAfterFinish", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Bool) + require.Nil(t, err) + require.NotNil(t, w) + + _, err = w.GetPayloadBufferFromWriter() + assert.NotNil(t, err) + + err = w.AddBoolToPayload([]bool{}) + assert.NotNil(t, err) + err = w.FinishPayloadWriter() + assert.Nil(t, err) + err = w.AddBoolToPayload([]bool{false}) + assert.NotNil(t, err) + }) + + t.Run("TestAddInt8AfterFinish", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Int8) + require.Nil(t, err) + require.NotNil(t, w) + defer w.Close() + + _, err = w.GetPayloadBufferFromWriter() + assert.NotNil(t, err) + + err = w.AddInt8ToPayload([]int8{}) + assert.NotNil(t, err) + err = w.FinishPayloadWriter() + assert.Nil(t, err) + err = w.AddInt8ToPayload([]int8{0}) + assert.NotNil(t, err) + }) + t.Run("TestAddInt16AfterFinish", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Int16) + require.Nil(t, err) + require.NotNil(t, w) + defer w.Close() + + _, err = w.GetPayloadBufferFromWriter() + assert.NotNil(t, err) + + err = w.AddInt16ToPayload([]int16{}) + assert.NotNil(t, err) + err = w.FinishPayloadWriter() + assert.Nil(t, err) + err = w.AddInt16ToPayload([]int16{0}) + assert.NotNil(t, err) + }) + t.Run("TestAddInt32AfterFinish", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Int32) + require.Nil(t, err) + require.NotNil(t, w) + defer w.Close() + + _, err = w.GetPayloadBufferFromWriter() + assert.NotNil(t, err) + + err = w.AddInt32ToPayload([]int32{}) + assert.NotNil(t, err) + err = w.FinishPayloadWriter() + assert.Nil(t, err) + err = w.AddInt32ToPayload([]int32{0}) + assert.NotNil(t, err) + }) + t.Run("TestAddInt64AfterFinish", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Int64) + require.Nil(t, err) + require.NotNil(t, w) + defer w.Close() + + _, err = w.GetPayloadBufferFromWriter() + assert.NotNil(t, err) + + err = w.AddInt64ToPayload([]int64{}) + assert.NotNil(t, err) + err = w.FinishPayloadWriter() + assert.Nil(t, err) + err = w.AddInt64ToPayload([]int64{0}) + assert.NotNil(t, err) + }) + t.Run("TestAddFloatAfterFinish", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Float) + require.Nil(t, err) + require.NotNil(t, w) + defer w.Close() + + _, err = w.GetPayloadBufferFromWriter() + assert.NotNil(t, err) + + err = w.AddFloatToPayload([]float32{}) + assert.NotNil(t, err) + err = w.FinishPayloadWriter() + assert.Nil(t, err) + err = w.AddFloatToPayload([]float32{0.0}) + assert.NotNil(t, err) + }) + t.Run("TestAddDoubleAfterFinish", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Double) + require.Nil(t, err) + require.NotNil(t, w) + defer w.Close() + + _, err = w.GetPayloadBufferFromWriter() + assert.NotNil(t, err) + + err = w.AddDoubleToPayload([]float64{}) + assert.NotNil(t, err) + err = w.FinishPayloadWriter() + assert.Nil(t, err) + err = w.AddDoubleToPayload([]float64{0.0}) + assert.NotNil(t, err) + }) + t.Run("TestAddStringAfterFinish", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_String) + require.Nil(t, err) + require.NotNil(t, w) + defer w.Close() + + _, err = w.GetPayloadBufferFromWriter() + assert.NotNil(t, err) + + err = w.AddOneStringToPayload("") + assert.NotNil(t, err) + err = w.FinishPayloadWriter() + assert.Nil(t, err) + err = w.AddOneStringToPayload("c") + assert.NotNil(t, err) + }) + t.Run("TestAddBinVectorAfterFinish", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_BinaryVector) + require.Nil(t, err) + require.NotNil(t, w) + defer w.Close() + + _, err = w.GetPayloadBufferFromWriter() + assert.NotNil(t, err) + + err = w.FinishPayloadWriter() + assert.NotNil(t, err) + + err = w.AddBinaryVectorToPayload([]byte{}, 8) + assert.NotNil(t, err) + err = w.AddBinaryVectorToPayload([]byte{1}, 0) + assert.NotNil(t, err) + + err = w.AddBinaryVectorToPayload([]byte{1, 0, 0, 0, 0, 0, 0, 0}, 8) + assert.Nil(t, err) + err = w.FinishPayloadWriter() + assert.Nil(t, err) + err = w.AddBinaryVectorToPayload([]byte{1, 0, 0, 0, 0, 0, 0, 0}, 8) + assert.NotNil(t, err) + }) + t.Run("TestAddFloatVectorAfterFinish", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_FloatVector) + require.Nil(t, err) + require.NotNil(t, w) + defer w.Close() + + err = w.FinishPayloadWriter() + assert.NotNil(t, err) + + err = w.AddFloatVectorToPayload([]float32{}, 8) + assert.NotNil(t, err) + err = w.AddFloatVectorToPayload([]float32{1.0}, 0) + assert.NotNil(t, err) + + err = w.AddFloatVectorToPayload([]float32{1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0}, 8) + assert.Nil(t, err) + err = w.FinishPayloadWriter() + assert.Nil(t, err) + err = w.AddFloatVectorToPayload([]float32{1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0}, 8) + assert.NotNil(t, err) + }) + + t.Run("TestNewReadError", func(t *testing.T) { + buffer := []byte{0} + r, err := NewPayloadReader(999, buffer) + assert.NotNil(t, err) + assert.Nil(t, r) + }) + t.Run("TestGetDataError", func(t *testing.T) { + r := PayloadReader{} + r.colType = 999 + + _, _, err := r.GetDataFromPayload() + assert.NotNil(t, err) + + _, _, err = r.GetDataFromPayload(1) + assert.NotNil(t, err) + + _, _, err = r.GetDataFromPayload(1, 1) + assert.NotNil(t, err) + }) + t.Run("TestGetBoolError", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Int8) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddInt8ToPayload([]int8{1, 2, 3}) + assert.Nil(t, err) + + err = w.FinishPayloadWriter() + assert.Nil(t, err) + + buffer, err := w.GetPayloadBufferFromWriter() + assert.Nil(t, err) + + r, err := NewPayloadReader(schemapb.DataType_Bool, buffer) + assert.Nil(t, err) + + _, err = r.GetBoolFromPayload() + assert.NotNil(t, err) + + r.colType = 999 + _, err = r.GetBoolFromPayload() + assert.NotNil(t, err) + }) + t.Run("TestGetInt8Error", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Bool) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddBoolToPayload([]bool{false, true, true}) + assert.Nil(t, err) + + err = w.FinishPayloadWriter() + assert.Nil(t, err) + + buffer, err := w.GetPayloadBufferFromWriter() + assert.Nil(t, err) + + r, err := NewPayloadReader(schemapb.DataType_Int8, buffer) + assert.Nil(t, err) + + _, err = r.GetInt8FromPayload() + assert.NotNil(t, err) + + r.colType = 999 + _, err = r.GetInt8FromPayload() + assert.NotNil(t, err) + }) + t.Run("TestGetInt16Error", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Bool) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddBoolToPayload([]bool{false, true, true}) + assert.Nil(t, err) + + err = w.FinishPayloadWriter() + assert.Nil(t, err) + + buffer, err := w.GetPayloadBufferFromWriter() + assert.Nil(t, err) + + r, err := NewPayloadReader(schemapb.DataType_Int16, buffer) + assert.Nil(t, err) + + _, err = r.GetInt16FromPayload() + assert.NotNil(t, err) + + r.colType = 999 + _, err = r.GetInt16FromPayload() + assert.NotNil(t, err) + }) + t.Run("TestGetInt32Error", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Bool) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddBoolToPayload([]bool{false, true, true}) + assert.Nil(t, err) + + err = w.FinishPayloadWriter() + assert.Nil(t, err) + + buffer, err := w.GetPayloadBufferFromWriter() + assert.Nil(t, err) + + r, err := NewPayloadReader(schemapb.DataType_Int32, buffer) + assert.Nil(t, err) + + _, err = r.GetInt32FromPayload() + assert.NotNil(t, err) + + r.colType = 999 + _, err = r.GetInt32FromPayload() + assert.NotNil(t, err) + }) + t.Run("TestGetInt64Error", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Bool) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddBoolToPayload([]bool{false, true, true}) + assert.Nil(t, err) + + err = w.FinishPayloadWriter() + assert.Nil(t, err) + + buffer, err := w.GetPayloadBufferFromWriter() + assert.Nil(t, err) + + r, err := NewPayloadReader(schemapb.DataType_Int64, buffer) + assert.Nil(t, err) + + _, err = r.GetInt64FromPayload() + assert.NotNil(t, err) + + r.colType = 999 + _, err = r.GetInt64FromPayload() + assert.NotNil(t, err) + }) + t.Run("TestGetFloatError", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Bool) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddBoolToPayload([]bool{false, true, true}) + assert.Nil(t, err) + + err = w.FinishPayloadWriter() + assert.Nil(t, err) + + buffer, err := w.GetPayloadBufferFromWriter() + assert.Nil(t, err) + + r, err := NewPayloadReader(schemapb.DataType_Float, buffer) + assert.Nil(t, err) + + _, err = r.GetFloatFromPayload() + assert.NotNil(t, err) + + r.colType = 999 + _, err = r.GetFloatFromPayload() + assert.NotNil(t, err) + }) + t.Run("TestGetDoubleError", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Bool) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddBoolToPayload([]bool{false, true, true}) + assert.Nil(t, err) + + err = w.FinishPayloadWriter() + assert.Nil(t, err) + + buffer, err := w.GetPayloadBufferFromWriter() + assert.Nil(t, err) + + r, err := NewPayloadReader(schemapb.DataType_Double, buffer) + assert.Nil(t, err) + + _, err = r.GetDoubleFromPayload() + assert.NotNil(t, err) + + r.colType = 999 + _, err = r.GetDoubleFromPayload() + assert.NotNil(t, err) + }) + t.Run("TestGetStringError", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Bool) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddBoolToPayload([]bool{false, true, true}) + assert.Nil(t, err) + + err = w.FinishPayloadWriter() + assert.Nil(t, err) + + buffer, err := w.GetPayloadBufferFromWriter() + assert.Nil(t, err) + + r, err := NewPayloadReader(schemapb.DataType_String, buffer) + assert.Nil(t, err) + + _, err = r.GetOneStringFromPayload(0) + assert.NotNil(t, err) + + r.colType = 999 + _, err = r.GetOneStringFromPayload(0) + assert.NotNil(t, err) + }) + t.Run("TestGetBinaryVectorError", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Bool) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddBoolToPayload([]bool{false, true, true}) + assert.Nil(t, err) + + err = w.FinishPayloadWriter() + assert.Nil(t, err) + + buffer, err := w.GetPayloadBufferFromWriter() + assert.Nil(t, err) + + r, err := NewPayloadReader(schemapb.DataType_BinaryVector, buffer) + assert.Nil(t, err) + + _, _, err = r.GetBinaryVectorFromPayload() + assert.NotNil(t, err) + + r.colType = 999 + _, _, err = r.GetBinaryVectorFromPayload() + assert.NotNil(t, err) + }) + t.Run("TestGetFloatVectorError", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Bool) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddBoolToPayload([]bool{false, true, true}) + assert.Nil(t, err) + + err = w.FinishPayloadWriter() + assert.Nil(t, err) + + buffer, err := w.GetPayloadBufferFromWriter() + assert.Nil(t, err) + + r, err := NewPayloadReader(schemapb.DataType_FloatVector, buffer) + assert.Nil(t, err) + + _, _, err = r.GetFloatVectorFromPayload() + assert.NotNil(t, err) + + r.colType = 999 + _, _, err = r.GetFloatVectorFromPayload() + assert.NotNil(t, err) + }) } diff --git a/internal/storage/print_binglog_test.go b/internal/storage/print_binglog_test.go index 376dd3194c..0f8f62e6bf 100644 --- a/internal/storage/print_binglog_test.go +++ b/internal/storage/print_binglog_test.go @@ -16,8 +16,7 @@ import ( ) func TestPrintBinlogFilesInt64(t *testing.T) { - w, err := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40) - assert.Nil(t, err) + w := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40) curTS := time.Now().UnixNano() / int64(time.Millisecond) @@ -300,6 +299,7 @@ func TestPrintBinlogFiles(t *testing.T) { err = fd.Close() assert.Nil(t, err) } + binlogFiles = append(binlogFiles, "test") PrintBinlogFiles(binlogFiles) } diff --git a/internal/storage/unsafe_test.go b/internal/storage/unsafe_test.go new file mode 100644 index 0000000000..f9a422b11f --- /dev/null +++ b/internal/storage/unsafe_test.go @@ -0,0 +1,33 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestUnsafe(t *testing.T) { + buf := []byte{16} + int8Res := UnsafeReadInt8(buf, 0) + assert.Equal(t, int8Res, int8(16)) + + buf = []byte{16, 16} + int16Res := UnsafeReadInt16(buf, 0) + assert.Equal(t, int16Res, int16(4112)) + + buf = []byte{16, 16, 16, 16} + int32Res := UnsafeReadInt32(buf, 0) + assert.Equal(t, int32Res, int32(269488144)) + + buf = []byte{16, 16, 16, 16, 16, 16, 16, 16} + int64Res := UnsafeReadInt64(buf, 0) + assert.Equal(t, int64Res, int64(1157442765409226768)) + + buf = []byte{16, 16, 16, 16} + float32Res := UnsafeReadFloat32(buf, 0) + assert.Equal(t, float32Res, float32(2.8411367e-29)) + + buf = []byte{16, 16, 16, 16, 16, 16, 16, 16} + float64Res := UnsafeReadFloat64(buf, 0) + assert.Equal(t, float64Res, float64(2.586563270614692e-231)) +}