mirror of https://github.com/milvus-io/milvus.git
Add minio FGet method (#6386)
* Add minio FGet method Signed-off-by: godchen <qingxiang.chen@zilliz.com> * go fmt Signed-off-by: godchen <qingxiang.chen@zilliz.com> * change el append Signed-off-by: godchen <qingxiang.chen@zilliz.com>pull/6400/head
parent
88078eb212
commit
346e9cba4a
|
@ -66,6 +66,9 @@ dataCoord:
|
|||
dataNode:
|
||||
port: 21124
|
||||
|
||||
storage:
|
||||
path: /var/lib/milvus/data/
|
||||
|
||||
log:
|
||||
level: debug # info, warn, error, panic, fatal
|
||||
file:
|
||||
|
|
|
@ -13,17 +13,13 @@ package miniokv
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"time"
|
||||
"sync"
|
||||
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/util/performance"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
|
@ -121,6 +117,39 @@ func (kv *MinIOKV) Load(key string) (string, error) {
|
|||
return buf.String(), nil
|
||||
}
|
||||
|
||||
// FGetObject download file from minio to local storage system.
|
||||
func (kv *MinIOKV) FGetObject(key, localPath string) error {
|
||||
err := kv.minioClient.FGetObject(kv.ctx, kv.bucketName, key, localPath+key, minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// FGetObjects download file from minio to local storage system.
|
||||
// For parallell downloads file, n goroutines will be started to download n keys.
|
||||
func (kv *MinIOKV) FGetObjects(keys []string, localPath string) error {
|
||||
var wg sync.WaitGroup
|
||||
el := make(errorList, len(keys))
|
||||
for i, key := range keys {
|
||||
wg.Add(1)
|
||||
go func(i int, key string) {
|
||||
err := kv.minioClient.FGetObject(kv.ctx, kv.bucketName, key, localPath+key, minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
el[i] = err
|
||||
}
|
||||
wg.Done()
|
||||
}(i, key)
|
||||
}
|
||||
wg.Wait()
|
||||
for _, err := range el {
|
||||
if err != nil {
|
||||
return el
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kv *MinIOKV) MultiLoad(keys []string) ([]string, error) {
|
||||
var resultErr error
|
||||
var objectsValues []string
|
||||
|
@ -202,46 +231,13 @@ func (kv *MinIOKV) Close() {
|
|||
|
||||
}
|
||||
|
||||
type Case struct {
|
||||
Name string
|
||||
BlockSize int // unit: byte
|
||||
Speed float64 // unit: MB/s
|
||||
}
|
||||
type errorList []error
|
||||
|
||||
type Test struct {
|
||||
Name string
|
||||
Cases []Case
|
||||
}
|
||||
|
||||
func (kv *MinIOKV) performanceTest(toFile bool, totalBytes int) {
|
||||
r := rand.Int()
|
||||
results := Test{Name: "MinIO performance"}
|
||||
for i := 0; i < 10; i += 2 {
|
||||
data := performance.GenerateData(2*1024, float64(9-i))
|
||||
startT := time.Now()
|
||||
for j := 0; j < totalBytes/(len(data)); j++ {
|
||||
kv.Save(fmt.Sprintf("performance-rand%d-test-%d-%d", r, i, j), data)
|
||||
}
|
||||
tc := time.Since(startT)
|
||||
results.Cases = append(results.Cases, Case{Name: "write", BlockSize: len(data), Speed: 16.0 / tc.Seconds()})
|
||||
|
||||
startT = time.Now()
|
||||
for j := 0; j < totalBytes/(len(data)); j++ {
|
||||
kv.Load(fmt.Sprintf("performance-rand%d-test-%d-%d", r, i, j))
|
||||
}
|
||||
tc = time.Since(startT)
|
||||
results.Cases = append(results.Cases, Case{Name: "read", BlockSize: len(data), Speed: 16.0 / tc.Seconds()})
|
||||
}
|
||||
kv.RemoveWithPrefix(fmt.Sprintf("performance-rand%d", r))
|
||||
mb, err := json.Marshal(results)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
log.Debug(string(mb))
|
||||
if toFile {
|
||||
err = ioutil.WriteFile(fmt.Sprintf("./%d", r), mb, 0644)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
func (el errorList) Error() string {
|
||||
var builder strings.Builder
|
||||
builder.WriteString("All downloads results:\n")
|
||||
for index, err := range el {
|
||||
builder.WriteString(fmt.Sprintf("downloads #%d:%s\n", index+1, err.Error()))
|
||||
}
|
||||
return builder.String()
|
||||
}
|
||||
|
|
|
@ -13,6 +13,8 @@ package miniokv_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
|
@ -186,3 +188,94 @@ func TestMinIOKV_Remove(t *testing.T) {
|
|||
assert.Error(t, err)
|
||||
assert.Empty(t, val)
|
||||
}
|
||||
|
||||
func TestMinIOKV_FGetObject(t *testing.T) {
|
||||
Params.Init()
|
||||
path := "/tmp/milvus/data"
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
bucketName := "fantastic-tech-test"
|
||||
MinIOKV, err := newMinIOKVClient(ctx, bucketName)
|
||||
assert.Nil(t, err)
|
||||
defer MinIOKV.RemoveWithPrefix("")
|
||||
|
||||
name1 := "31280791048324/4325023534/53443534/key_1"
|
||||
value1 := "123"
|
||||
err = MinIOKV.Save(name1, value1)
|
||||
assert.Nil(t, err)
|
||||
name2 := "312895849354/31205934503459/18948129301/key_2"
|
||||
value2 := "333"
|
||||
err = MinIOKV.Save(name2, value2)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = MinIOKV.FGetObject(name1, path)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = MinIOKV.FGetObject(name2, path)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = MinIOKV.FGetObject("fail", path)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
file1, err := os.Open(path + name1)
|
||||
assert.Nil(t, err)
|
||||
content1, err := ioutil.ReadAll(file1)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, value1, string(content1))
|
||||
defer file1.Close()
|
||||
defer os.Remove(path + name1)
|
||||
|
||||
file2, err := os.Open(path + name2)
|
||||
assert.Nil(t, err)
|
||||
content2, err := ioutil.ReadAll(file2)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, value2, string(content2))
|
||||
defer file1.Close()
|
||||
defer os.Remove(path + name2)
|
||||
}
|
||||
|
||||
func TestMinIOKV_FGetObjects(t *testing.T) {
|
||||
Params.Init()
|
||||
path := "/tmp/milvus/data"
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
bucketName := "fantastic-tech-test"
|
||||
MinIOKV, err := newMinIOKVClient(ctx, bucketName)
|
||||
assert.Nil(t, err)
|
||||
defer MinIOKV.RemoveWithPrefix("")
|
||||
|
||||
name1 := "31280791048324/4325023534/53443534/key_1"
|
||||
value1 := "123"
|
||||
err = MinIOKV.Save(name1, value1)
|
||||
assert.Nil(t, err)
|
||||
name2 := "312895849354/31205934503459/18948129301/key_2"
|
||||
value2 := "333"
|
||||
err = MinIOKV.Save(name2, value2)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = MinIOKV.FGetObjects([]string{name1, name2}, path)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = MinIOKV.FGetObjects([]string{"fail1", "fail2"}, path)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
file1, err := os.Open(path + name1)
|
||||
assert.Nil(t, err)
|
||||
content1, err := ioutil.ReadAll(file1)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, value1, string(content1))
|
||||
defer file1.Close()
|
||||
defer os.Remove(path + name1)
|
||||
|
||||
file2, err := os.Open(path + name2)
|
||||
assert.Nil(t, err)
|
||||
content2, err := ioutil.ReadAll(file2)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, value2, string(content2))
|
||||
defer file1.Close()
|
||||
defer os.Remove(path + name2)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue