mirror of https://github.com/milvus-io/milvus.git
Increase compatibility for EstimateMemorySize interface (#10603)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>pull/10661/head
parent
0cedd9d47e
commit
5b42a3223c
|
@ -34,6 +34,7 @@ type BaseKV interface {
|
|||
type DataKV interface {
|
||||
BaseKV
|
||||
LoadPartial(key string, start, end int64) ([]byte, error)
|
||||
GetSize(key string) (int64, error)
|
||||
}
|
||||
|
||||
// TxnKV contains extra txn operations of kv. The extra operations is transactional.
|
||||
|
|
|
@ -212,3 +212,12 @@ func (kv *MemoryKV) LoadPartial(key string, start, end int64) ([]byte, error) {
|
|||
start, end)
|
||||
}
|
||||
}
|
||||
|
||||
func (kv *MemoryKV) GetSize(key string) (int64, error) {
|
||||
value, err := kv.Load(key)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return int64(len(value)), nil
|
||||
}
|
||||
|
|
|
@ -56,3 +56,23 @@ func TestMemoryKV_LoadPartial(t *testing.T) {
|
|||
_, err = memKV.LoadPartial(key, start, end)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestMemoryKV_GetSize(t *testing.T) {
|
||||
memKV := NewMemoryKV()
|
||||
|
||||
key := "TestMemoryKV_GetSize_key"
|
||||
value := "TestMemoryKV_GetSize_value"
|
||||
|
||||
err := memKV.Save(key, value)
|
||||
assert.NoError(t, err)
|
||||
|
||||
size, err := memKV.GetSize(key)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, size, int64(len(value)))
|
||||
|
||||
key2 := "TestMemoryKV_GetSize_key2"
|
||||
|
||||
size, err = memKV.GetSize(key2)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(0), size)
|
||||
}
|
||||
|
|
|
@ -272,6 +272,15 @@ func (kv *MinIOKV) LoadPartial(key string, start, end int64) ([]byte, error) {
|
|||
return ioutil.ReadAll(object)
|
||||
}
|
||||
|
||||
func (kv *MinIOKV) GetSize(key string) (int64, error) {
|
||||
objectInfo, err := kv.minioClient.StatObject(kv.ctx, kv.bucketName, key, minio.StatObjectOptions{})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return objectInfo.Size, nil
|
||||
}
|
||||
|
||||
func (kv *MinIOKV) Close() {
|
||||
|
||||
}
|
||||
|
|
|
@ -334,3 +334,31 @@ func TestMinIOKV_FGetObjects(t *testing.T) {
|
|||
defer file1.Close()
|
||||
defer os.Remove(path + name2)
|
||||
}
|
||||
|
||||
func TestMinIOKV_GetSize(t *testing.T) {
|
||||
Params.Init()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
bucketName := "fantastic-tech-test"
|
||||
minIOKV, err := newMinIOKVClient(ctx, bucketName)
|
||||
assert.Nil(t, err)
|
||||
defer minIOKV.RemoveWithPrefix("")
|
||||
|
||||
key := "TestMinIOKV_GetSize_key"
|
||||
value := "TestMinIOKV_GetSize_value"
|
||||
|
||||
err = minIOKV.Save(key, value)
|
||||
assert.NoError(t, err)
|
||||
|
||||
size, err := minIOKV.GetSize(key)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, size, int64(len(value)))
|
||||
|
||||
key2 := "TestMemoryKV_GetSize_key2"
|
||||
|
||||
size, err = minIOKV.GetSize(key2)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, int64(0), size)
|
||||
}
|
||||
|
|
|
@ -152,7 +152,10 @@ func (loader *indexLoader) estimateIndexBinlogSize(segment *Segment, fieldID Fie
|
|||
for _, p := range indexPaths {
|
||||
logSize, err := storage.EstimateMemorySize(loader.kv, p)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
logSize, err = storage.GetBinlogSize(loader.kv, p)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
indexSize += logSize
|
||||
}
|
||||
|
|
|
@ -407,7 +407,10 @@ func (loader *segmentLoader) estimateSegmentSize(segment *Segment,
|
|||
for _, binlogPath := range fb.Binlogs {
|
||||
logSize, err := storage.EstimateMemorySize(loader.minioKV, binlogPath)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
logSize, err = storage.GetBinlogSize(loader.minioKV, binlogPath)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
segmentSize += logSize
|
||||
}
|
||||
|
|
|
@ -26,44 +26,8 @@ import (
|
|||
// key not in binlog format, size = (a not accurate number), error != nil;
|
||||
// failed to read event reader, size = (a not accurate number), error != nil;
|
||||
func GetBinlogSize(kv kv.DataKV, key string) (int64, error) {
|
||||
total := int64(0)
|
||||
|
||||
header := &baseEventHeader{}
|
||||
headerSize := binary.Size(header)
|
||||
|
||||
startPos := binary.Size(MagicNumber)
|
||||
endPos := startPos + headerSize
|
||||
|
||||
for {
|
||||
headerContent, err := kv.LoadPartial(key, int64(startPos), int64(endPos))
|
||||
if err != nil {
|
||||
// case 1: key not exist, total = 0;
|
||||
// case 2: all events have been read, total = (length of all events);
|
||||
// whatever the case is, the return value is reasonable.
|
||||
return total, nil
|
||||
}
|
||||
|
||||
buffer := bytes.NewBuffer(headerContent)
|
||||
|
||||
header, err := readEventHeader(buffer)
|
||||
if err != nil {
|
||||
// FIXME(dragondriver): should we return 0 here?
|
||||
return total, fmt.Errorf("failed to read event reader: %v", err)
|
||||
}
|
||||
|
||||
if header.EventLength <= 0 || header.NextPosition < int32(endPos) {
|
||||
// key not in binlog format
|
||||
// FIXME(dragondriver): should we return 0 here?
|
||||
return total, fmt.Errorf("key not in binlog format")
|
||||
}
|
||||
|
||||
total += int64(header.EventLength)
|
||||
// startPos = startPos + int(header.EventLength)
|
||||
// ||
|
||||
// \/
|
||||
startPos = int(header.NextPosition)
|
||||
endPos = startPos + headerSize
|
||||
}
|
||||
return kv.GetSize(key)
|
||||
}
|
||||
|
||||
// EstimateMemorySize get approximate memory size of a binlog file.
|
||||
|
|
|
@ -45,6 +45,10 @@ func (kv *mockLessHeaderDataKV) LoadPartial(key string, start, end int64) ([]byt
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
func (kv *mockLessHeaderDataKV) GetSize(key string) (int64, error) {
|
||||
return 0, errors.New("less header")
|
||||
}
|
||||
|
||||
func newMockLessHeaderDataKV() *mockLessHeaderDataKV {
|
||||
return &mockLessHeaderDataKV{}
|
||||
}
|
||||
|
@ -65,6 +69,10 @@ func (kv *mockWrongHeaderDataKV) LoadPartial(key string, start, end int64) ([]by
|
|||
return buffer.Bytes(), nil
|
||||
}
|
||||
|
||||
func (kv *mockWrongHeaderDataKV) GetSize(key string) (int64, error) {
|
||||
return 0, errors.New("wrong header")
|
||||
}
|
||||
|
||||
func newMockWrongHeaderDataKV() kv.DataKV {
|
||||
return &mockWrongHeaderDataKV{}
|
||||
}
|
||||
|
@ -121,7 +129,7 @@ func TestGetBinlogSize(t *testing.T) {
|
|||
|
||||
size, err = GetBinlogSize(memoryKV, blob.Key)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, size+int64(binary.Size(MagicNumber)), int64(len(blob.Value)))
|
||||
assert.Equal(t, size, int64(len(blob.Value)))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -243,6 +251,10 @@ func (kv *mockFailedToGetDescDataKV) LoadPartial(key string, start, end int64) (
|
|||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
func (kv *mockFailedToGetDescDataKV) GetSize(key string) (int64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func newMockFailedToGetDescDataKV() *mockFailedToGetDescDataKV {
|
||||
return &mockFailedToGetDescDataKV{}
|
||||
}
|
||||
|
@ -282,6 +294,10 @@ func (kv *mockLessDescDataKV) LoadPartial(key string, start, end int64) ([]byte,
|
|||
*/
|
||||
}
|
||||
|
||||
func (kv *mockLessDescDataKV) GetSize(key string) (int64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func newMockLessDescDataKV() *mockLessDescDataKV {
|
||||
return &mockLessDescDataKV{}
|
||||
}
|
||||
|
@ -307,6 +323,10 @@ func (kv *mockOriginalSizeDataKV) LoadPartial(key string, start, end int64) ([]b
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func (kv *mockOriginalSizeDataKV) GetSize(key string) (int64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func newMockOriginalSizeDataKV() *mockOriginalSizeDataKV {
|
||||
return &mockOriginalSizeDataKV{}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue