mirror of https://github.com/milvus-io/milvus.git
fix: descriptor event in previous version not has nullable to parse error (#34235)
#34176 --------- Signed-off-by: lixinguo <xinguo.li@zilliz.com> Co-authored-by: lixinguo <xinguo.li@zilliz.com>pull/34301/head
parent
ff51c7e628
commit
ef3ced8138
|
@ -35,6 +35,7 @@ const milvus::FieldId TimestampFieldID = milvus::FieldId(1);
|
|||
// fill followed extra info to binlog file
|
||||
const char ORIGIN_SIZE_KEY[] = "original_size";
|
||||
const char INDEX_BUILD_ID_KEY[] = "indexBuildID";
|
||||
const char NULLABLE[] = "nullable";
|
||||
|
||||
const char INDEX_ROOT_PATH[] = "index_files";
|
||||
const char RAWDATA_ROOT_PATH[] = "raw_datas";
|
||||
|
|
|
@ -79,7 +79,8 @@ DeserializeRemoteFileData(BinlogReaderPtr reader) {
|
|||
auto& extras = descriptor_event.event_data.extras;
|
||||
AssertInfo(extras.find(INDEX_BUILD_ID_KEY) != extras.end(),
|
||||
"index build id not exist");
|
||||
index_meta.build_id = std::stol(extras[INDEX_BUILD_ID_KEY]);
|
||||
index_meta.build_id = std::stol(
|
||||
std::any_cast<std::string>(extras[INDEX_BUILD_ID_KEY]));
|
||||
index_data->set_index_meta(index_meta);
|
||||
index_data->SetTimestamps(index_event_data.start_timestamp,
|
||||
index_event_data.end_timestamp);
|
||||
|
|
|
@ -14,6 +14,9 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <glog/logging.h>
|
||||
#include <any>
|
||||
#include <string>
|
||||
#include "common/Array.h"
|
||||
#include "common/Consts.h"
|
||||
#include "common/EasyAssert.h"
|
||||
|
@ -24,6 +27,7 @@
|
|||
#include "storage/Event.h"
|
||||
#include "storage/PayloadReader.h"
|
||||
#include "storage/PayloadWriter.h"
|
||||
#include "log/Log.h"
|
||||
|
||||
namespace milvus::storage {
|
||||
|
||||
|
@ -34,7 +38,7 @@ GetFixPartSize(DescriptorEventData& data) {
|
|||
sizeof(data.fix_part.segment_id) + sizeof(data.fix_part.field_id) +
|
||||
sizeof(data.fix_part.start_timestamp) +
|
||||
sizeof(data.fix_part.end_timestamp) +
|
||||
sizeof(data.fix_part.data_type) + sizeof(data.fix_part.nullable);
|
||||
sizeof(data.fix_part.data_type);
|
||||
}
|
||||
int
|
||||
GetFixPartSize(BaseEventData& data) {
|
||||
|
@ -107,8 +111,6 @@ DescriptorEventDataFixPart::DescriptorEventDataFixPart(BinlogReaderPtr reader) {
|
|||
assert(ast.ok());
|
||||
ast = reader->Read(sizeof(field_id), &field_id);
|
||||
assert(ast.ok());
|
||||
ast = reader->Read(sizeof(nullable), &nullable);
|
||||
assert(ast.ok());
|
||||
ast = reader->Read(sizeof(start_timestamp), &start_timestamp);
|
||||
assert(ast.ok());
|
||||
ast = reader->Read(sizeof(end_timestamp), &end_timestamp);
|
||||
|
@ -122,7 +124,7 @@ DescriptorEventDataFixPart::Serialize() {
|
|||
auto fix_part_size = sizeof(collection_id) + sizeof(partition_id) +
|
||||
sizeof(segment_id) + sizeof(field_id) +
|
||||
sizeof(start_timestamp) + sizeof(end_timestamp) +
|
||||
sizeof(data_type) + sizeof(nullable);
|
||||
sizeof(data_type);
|
||||
std::vector<uint8_t> res(fix_part_size);
|
||||
int offset = 0;
|
||||
memcpy(res.data() + offset, &collection_id, sizeof(collection_id));
|
||||
|
@ -133,8 +135,6 @@ DescriptorEventDataFixPart::Serialize() {
|
|||
offset += sizeof(segment_id);
|
||||
memcpy(res.data() + offset, &field_id, sizeof(field_id));
|
||||
offset += sizeof(field_id);
|
||||
memcpy(res.data() + offset, &nullable, sizeof(nullable));
|
||||
offset += sizeof(nullable);
|
||||
memcpy(res.data() + offset, &start_timestamp, sizeof(start_timestamp));
|
||||
offset += sizeof(start_timestamp);
|
||||
memcpy(res.data() + offset, &end_timestamp, sizeof(end_timestamp));
|
||||
|
@ -163,10 +163,15 @@ DescriptorEventData::DescriptorEventData(BinlogReaderPtr reader) {
|
|||
nlohmann::json json =
|
||||
nlohmann::json::parse(extra_bytes.begin(), extra_bytes.end());
|
||||
if (json.contains(ORIGIN_SIZE_KEY)) {
|
||||
extras[ORIGIN_SIZE_KEY] = json[ORIGIN_SIZE_KEY];
|
||||
extras[ORIGIN_SIZE_KEY] =
|
||||
static_cast<std::string>(json[ORIGIN_SIZE_KEY]);
|
||||
}
|
||||
if (json.contains(INDEX_BUILD_ID_KEY)) {
|
||||
extras[INDEX_BUILD_ID_KEY] = json[INDEX_BUILD_ID_KEY];
|
||||
extras[INDEX_BUILD_ID_KEY] =
|
||||
static_cast<std::string>(json[INDEX_BUILD_ID_KEY]);
|
||||
}
|
||||
if (json.contains(NULLABLE)) {
|
||||
extras[NULLABLE] = static_cast<bool>(json[NULLABLE]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -175,7 +180,11 @@ DescriptorEventData::Serialize() {
|
|||
auto fix_part_data = fix_part.Serialize();
|
||||
nlohmann::json extras_json;
|
||||
for (auto v : extras) {
|
||||
extras_json.emplace(v.first, v.second);
|
||||
if (v.first == NULLABLE) {
|
||||
extras_json.emplace(v.first, std::any_cast<bool>(v.second));
|
||||
} else {
|
||||
extras_json.emplace(v.first, std::any_cast<std::string>(v.second));
|
||||
}
|
||||
}
|
||||
std::string extras_string = extras_json.dump();
|
||||
extra_length = extras_string.size();
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <any>
|
||||
#include <string>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
@ -46,8 +47,6 @@ struct DescriptorEventDataFixPart {
|
|||
int64_t partition_id;
|
||||
int64_t segment_id;
|
||||
int64_t field_id;
|
||||
//(todo:smellthemoon) set nullable false temporarily, will change it
|
||||
bool nullable = false;
|
||||
Timestamp start_timestamp;
|
||||
Timestamp end_timestamp;
|
||||
milvus::proto::schema::DataType data_type;
|
||||
|
@ -63,7 +62,7 @@ struct DescriptorEventData {
|
|||
DescriptorEventDataFixPart fix_part;
|
||||
int32_t extra_length;
|
||||
std::vector<uint8_t> extra_bytes;
|
||||
std::unordered_map<std::string, std::string> extras;
|
||||
std::unordered_map<std::string, std::any> extras;
|
||||
std::vector<uint8_t> post_header_lengths;
|
||||
|
||||
DescriptorEventData() = default;
|
||||
|
|
|
@ -61,8 +61,6 @@ InsertData::serialize_to_remote_file() {
|
|||
des_fix_part.start_timestamp = time_range_.first;
|
||||
des_fix_part.end_timestamp = time_range_.second;
|
||||
des_fix_part.data_type = milvus::proto::schema::DataType(data_type);
|
||||
//(todo:smellthemoon) set nullable false temporarily, will change it
|
||||
des_fix_part.nullable = false;
|
||||
for (auto i = int8_t(EventType::DescriptorEvent);
|
||||
i < int8_t(EventType::EventTypeEnd);
|
||||
i++) {
|
||||
|
@ -71,6 +69,7 @@ InsertData::serialize_to_remote_file() {
|
|||
}
|
||||
des_event_data.extras[ORIGIN_SIZE_KEY] =
|
||||
std::to_string(field_data_->Size());
|
||||
//(todo:smellthemoon) set nullable
|
||||
|
||||
auto& des_event_header = descriptor_event.event_header;
|
||||
// TODO :: set timestamp
|
||||
|
|
|
@ -49,8 +49,11 @@ func (reader *BinlogReader) NextEventReader() (*EventReader, error) {
|
|||
if reader.eventReader != nil {
|
||||
reader.eventReader.Close()
|
||||
}
|
||||
var err error
|
||||
reader.eventReader, err = newEventReader(reader.descriptorEvent.PayloadDataType, reader.buffer, reader.descriptorEvent.Nullable)
|
||||
nullable, err := reader.descriptorEvent.GetNullable()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
reader.eventReader, err = newEventReader(reader.descriptorEvent.PayloadDataType, reader.buffer, nullable)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -123,11 +123,6 @@ func TestInsertBinlog(t *testing.T) {
|
|||
assert.Equal(t, fieldID, int64(40))
|
||||
pos += int(unsafe.Sizeof(fieldID))
|
||||
|
||||
// descriptor data fix, nullable
|
||||
nullable := UnsafeReadBool(buf, pos)
|
||||
assert.Equal(t, nullable, false)
|
||||
pos += int(unsafe.Sizeof(nullable))
|
||||
|
||||
// descriptor data fix, start time stamp
|
||||
startts := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, startts, int64(1000))
|
||||
|
@ -379,11 +374,6 @@ func TestDeleteBinlog(t *testing.T) {
|
|||
assert.Equal(t, fieldID, int64(-1))
|
||||
pos += int(unsafe.Sizeof(fieldID))
|
||||
|
||||
// descriptor data fix, nullable
|
||||
nullable := UnsafeReadBool(buf, pos)
|
||||
assert.Equal(t, nullable, false)
|
||||
pos += int(unsafe.Sizeof(nullable))
|
||||
|
||||
// descriptor data fix, start time stamp
|
||||
startts := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, startts, int64(1000))
|
||||
|
@ -635,11 +625,6 @@ func TestDDLBinlog1(t *testing.T) {
|
|||
assert.Equal(t, fieldID, int64(-1))
|
||||
pos += int(unsafe.Sizeof(fieldID))
|
||||
|
||||
// descriptor data fix, nullable
|
||||
nullable := UnsafeReadBool(buf, pos)
|
||||
assert.Equal(t, nullable, false)
|
||||
pos += int(unsafe.Sizeof(nullable))
|
||||
|
||||
// descriptor data fix, start time stamp
|
||||
startts := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, startts, int64(1000))
|
||||
|
@ -890,11 +875,6 @@ func TestDDLBinlog2(t *testing.T) {
|
|||
assert.Equal(t, fieldID, int64(-1))
|
||||
pos += int(unsafe.Sizeof(fieldID))
|
||||
|
||||
// descriptor data fix, nullable
|
||||
nullable := UnsafeReadBool(buf, pos)
|
||||
assert.Equal(t, nullable, false)
|
||||
pos += int(unsafe.Sizeof(nullable))
|
||||
|
||||
// descriptor data fix, start time stamp
|
||||
startts := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, startts, int64(1000))
|
||||
|
@ -1140,11 +1120,6 @@ func TestIndexFileBinlog(t *testing.T) {
|
|||
assert.Equal(t, fieldID, fID)
|
||||
pos += int(unsafe.Sizeof(fID))
|
||||
|
||||
// descriptor data fix, nullable
|
||||
nullable := UnsafeReadBool(buf, pos)
|
||||
assert.Equal(t, nullable, false)
|
||||
pos += int(unsafe.Sizeof(nullable))
|
||||
|
||||
// descriptor data fix, start time stamp
|
||||
startts := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, startts, int64(timestamp))
|
||||
|
@ -1274,11 +1249,6 @@ func TestIndexFileBinlogV2(t *testing.T) {
|
|||
assert.Equal(t, fieldID, fID)
|
||||
pos += int(unsafe.Sizeof(fID))
|
||||
|
||||
// descriptor data fix, nullable
|
||||
nullable := UnsafeReadBool(buf, pos)
|
||||
assert.Equal(t, nullable, false)
|
||||
pos += int(unsafe.Sizeof(nullable))
|
||||
|
||||
// descriptor data fix, start time stamp
|
||||
startts := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, startts, int64(timestamp))
|
||||
|
|
|
@ -278,7 +278,8 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID
|
|||
descriptorEvent.PartitionID = partitionID
|
||||
descriptorEvent.SegmentID = segmentID
|
||||
descriptorEvent.FieldID = FieldID
|
||||
descriptorEvent.Nullable = nullable
|
||||
// store nullable in extra for compatible
|
||||
descriptorEvent.AddExtra(nullableKey, nullable)
|
||||
|
||||
w := &InsertBinlogWriter{
|
||||
baseBinlogWriter: baseBinlogWriter{
|
||||
|
|
|
@ -27,10 +27,12 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
const originalSizeKey = "original_size"
|
||||
const nullableKey = "nullable"
|
||||
|
||||
type descriptorEventData struct {
|
||||
DescriptorEventDataFixPart
|
||||
|
@ -46,7 +48,6 @@ type DescriptorEventDataFixPart struct {
|
|||
PartitionID int64
|
||||
SegmentID int64
|
||||
FieldID int64
|
||||
Nullable bool
|
||||
StartTimestamp typeutil.Timestamp
|
||||
EndTimestamp typeutil.Timestamp
|
||||
PayloadDataType schemapb.DataType
|
||||
|
@ -63,6 +64,20 @@ func (data *descriptorEventData) GetEventDataFixPartSize() int32 {
|
|||
return int32(binary.Size(data.DescriptorEventDataFixPart))
|
||||
}
|
||||
|
||||
func (data *descriptorEventData) GetNullable() (bool, error) {
|
||||
nullableStore, ok := data.Extras[nullableKey]
|
||||
// previous descriptorEventData not store nullable
|
||||
if !ok {
|
||||
return false, nil
|
||||
}
|
||||
nullable, ok := nullableStore.(bool)
|
||||
// will not happend, has checked bool format when FinishExtra
|
||||
if !ok {
|
||||
return false, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("value of %v must in bool format", nullableKey))
|
||||
}
|
||||
return nullable, nil
|
||||
}
|
||||
|
||||
// GetMemoryUsageInBytes returns the memory size of DescriptorEventDataFixPart.
|
||||
func (data *descriptorEventData) GetMemoryUsageInBytes() int32 {
|
||||
return data.GetEventDataFixPartSize() + int32(binary.Size(data.PostHeaderLengths)) + int32(binary.Size(data.ExtraLength)) + data.ExtraLength
|
||||
|
@ -94,6 +109,14 @@ func (data *descriptorEventData) FinishExtra() error {
|
|||
return fmt.Errorf("value of %v must be able to be converted into int format", originalSizeKey)
|
||||
}
|
||||
|
||||
nullableStore, existed := data.Extras[nullableKey]
|
||||
if existed {
|
||||
_, ok := nullableStore.(bool)
|
||||
if !ok {
|
||||
return merr.WrapErrParameterInvalidMsg(fmt.Sprintf("value of %v must in bool format", nullableKey))
|
||||
}
|
||||
}
|
||||
|
||||
data.ExtraBytes, err = json.Marshal(data.Extras)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -351,7 +374,6 @@ func newDescriptorEventData() *descriptorEventData {
|
|||
StartTimestamp: 0,
|
||||
EndTimestamp: 0,
|
||||
PayloadDataType: -1,
|
||||
Nullable: false,
|
||||
},
|
||||
PostHeaderLengths: []uint8{},
|
||||
Extras: make(map[string]interface{}),
|
||||
|
|
|
@ -54,11 +54,27 @@ func TestDescriptorEvent(t *testing.T) {
|
|||
err = desc.Write(&buf)
|
||||
assert.Error(t, err)
|
||||
|
||||
// nullable not existed
|
||||
nullable, err := desc.GetNullable()
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, nullable)
|
||||
|
||||
desc.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
|
||||
desc.AddExtra(nullableKey, "not bool format")
|
||||
|
||||
err = desc.Write(&buf)
|
||||
// nullable not formatted
|
||||
assert.Error(t, err)
|
||||
|
||||
desc.AddExtra(nullableKey, true)
|
||||
|
||||
err = desc.Write(&buf)
|
||||
assert.NoError(t, err)
|
||||
|
||||
nullable, err = desc.GetNullable()
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, nullable)
|
||||
|
||||
buffer := buf.Bytes()
|
||||
|
||||
ts := UnsafeReadInt64(buffer, 0)
|
||||
|
@ -89,25 +105,17 @@ func TestDescriptorEvent(t *testing.T) {
|
|||
int(unsafe.Sizeof(partID))+
|
||||
int(unsafe.Sizeof(segID)))
|
||||
assert.Equal(t, fieldID, int64(-1))
|
||||
nullable := UnsafeReadBool(buffer, binary.Size(eventHeader{})+
|
||||
int(unsafe.Sizeof(collID))+
|
||||
int(unsafe.Sizeof(partID))+
|
||||
int(unsafe.Sizeof(segID))+
|
||||
int(unsafe.Sizeof(fieldID)))
|
||||
assert.Equal(t, nullable, false)
|
||||
startTs := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+
|
||||
int(unsafe.Sizeof(collID))+
|
||||
int(unsafe.Sizeof(partID))+
|
||||
int(unsafe.Sizeof(segID))+
|
||||
int(unsafe.Sizeof(fieldID))+
|
||||
int(unsafe.Sizeof(nullable)))
|
||||
int(unsafe.Sizeof(fieldID)))
|
||||
assert.Equal(t, startTs, int64(0))
|
||||
endTs := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+
|
||||
int(unsafe.Sizeof(collID))+
|
||||
int(unsafe.Sizeof(partID))+
|
||||
int(unsafe.Sizeof(segID))+
|
||||
int(unsafe.Sizeof(fieldID))+
|
||||
int(unsafe.Sizeof(nullable))+
|
||||
int(unsafe.Sizeof(startTs)))
|
||||
assert.Equal(t, endTs, int64(0))
|
||||
colType := UnsafeReadInt32(buffer, binary.Size(eventHeader{})+
|
||||
|
@ -115,7 +123,6 @@ func TestDescriptorEvent(t *testing.T) {
|
|||
int(unsafe.Sizeof(partID))+
|
||||
int(unsafe.Sizeof(segID))+
|
||||
int(unsafe.Sizeof(fieldID))+
|
||||
int(unsafe.Sizeof(nullable))+
|
||||
int(unsafe.Sizeof(startTs))+
|
||||
int(unsafe.Sizeof(endTs)))
|
||||
assert.Equal(t, colType, int32(-1))
|
||||
|
@ -125,7 +132,6 @@ func TestDescriptorEvent(t *testing.T) {
|
|||
int(unsafe.Sizeof(partID)) +
|
||||
int(unsafe.Sizeof(segID)) +
|
||||
int(unsafe.Sizeof(fieldID)) +
|
||||
int(unsafe.Sizeof(nullable)) +
|
||||
int(unsafe.Sizeof(startTs)) +
|
||||
int(unsafe.Sizeof(endTs)) +
|
||||
int(unsafe.Sizeof(colType))
|
||||
|
|
Loading…
Reference in New Issue