Dock build index task with storage & indexbuilder

Signed-off-by: dragondriver <jiquan.long@zilliz.com>
pull/4973/head^2
dragondriver 2020-12-22 00:14:36 +00:00 committed by yefu.chen
parent 05918f55d2
commit adfb5187cf
7 changed files with 243 additions and 6 deletions

View File

@ -19,6 +19,7 @@ func (b *Builder) BuildIndex(ctx context.Context, request *indexbuilderpb.BuildI
t.idAllocator = b.idAllocator
t.buildQueue = b.sched.IndexBuildQueue
t.table = b.metaTable
t.kv = b.kv
var cancel func()
t.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()

View File

@ -9,9 +9,15 @@ import (
"sync"
"time"
miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"go.etcd.io/etcd/clientv3"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/kv"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/proto/indexbuilderpb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
@ -31,6 +37,8 @@ type Builder struct {
idAllocator *allocator.IDAllocator
kv kv.Base
metaTable *metaTable
// Add callback functions at different stages
startCallbacks []func()
@ -63,12 +71,22 @@ func CreateBuilder(ctx context.Context) (*Builder, error) {
idAllocator, err := allocator.NewIDAllocator(b.loopCtx, Params.MasterAddress)
minIOEndPoint := Params.MinIOAddress
minIOAccessKeyID := Params.MinIOAccessKeyID
minIOSecretAccessKey := Params.MinIOSecretAccessKey
minIOUseSSL := Params.MinIOUseSSL
minIOClient, err := minio.New(minIOEndPoint, &minio.Options{
Creds: credentials.NewStaticV4(minIOAccessKeyID, minIOSecretAccessKey, ""),
Secure: minIOUseSSL,
})
b.kv, err = miniokv.NewMinIOKV(b.loopCtx, minIOClient, "milvus-distributed-indexbuilder")
if err != nil {
return nil, err
}
b.idAllocator = idAllocator
b.sched, err = NewTaskScheduler(b.loopCtx, b.idAllocator, b.metaTable)
b.sched, err = NewTaskScheduler(b.loopCtx, b.idAllocator, b.kv, b.metaTable)
if err != nil {
return nil, err
}

View File

@ -17,6 +17,12 @@ type ParamTable struct {
EtcdAddress string
MetaRootPath string
MinIOAddress string
MinIOPort int
MinIOAccessKeyID string
MinIOSecretAccessKey string
MinIOUseSSL bool
}
var Params ParamTable
@ -86,3 +92,49 @@ func (pt *ParamTable) initMasterAddress() {
}
pt.MasterAddress = ret
}
func (pt *ParamTable) initMinIOAddress() {
ret, err := pt.Load("_MinioAddress")
if err != nil {
panic(err)
}
pt.MinIOAddress = ret
}
func (pt *ParamTable) initMinIOPort() {
ret, err := pt.Load("_MinIOPort")
if err != nil {
panic(err)
}
pt.MinIOPort, err = strconv.Atoi(ret)
if err != nil {
panic(err)
}
}
func (pt *ParamTable) initMinIOAccessKeyID() {
ret, err := pt.Load("minio.accessKeyID")
if err != nil {
panic(err)
}
pt.MinIOAccessKeyID = ret
}
func (pt *ParamTable) initMinIOSecretAccessKey() {
ret, err := pt.Load("minio.secretAccessKey")
if err != nil {
panic(err)
}
pt.MinIOSecretAccessKey = ret
}
func (pt *ParamTable) initMinIOUseSSL() {
ret, err := pt.Load("minio.useSSL")
if err != nil {
panic(err)
}
pt.MinIOUseSSL, err = strconv.ParseBool(ret)
if err != nil {
panic(err)
}
}

View File

@ -24,3 +24,28 @@ func TestParamTable_MetaRootPath(t *testing.T) {
path := Params.MetaRootPath
assert.Equal(t, path, "by-dev/meta")
}
func TestParamTable_MinIOAddress(t *testing.T) {
address := Params.MinIOAccessKeyID
assert.Equal(t, address, "localhost")
}
func TestParamTable_MinIOPort(t *testing.T) {
port := Params.MinIOPort
assert.Equal(t, port, 9000)
}
func TestParamTable_MinIOAccessKeyID(t *testing.T) {
accessKeyID := Params.MinIOAccessKeyID
assert.Equal(t, accessKeyID, "minioadmin")
}
func TestParamTable_MinIOSecretAccessKey(t *testing.T) {
secretAccessKey := Params.MinIOSecretAccessKey
assert.Equal(t, secretAccessKey, "minioadmin")
}
func TestParamTable_MinIOUseSSL(t *testing.T) {
useSSL := Params.MinIOUseSSL
assert.Equal(t, useSSL, false)
}

View File

@ -3,11 +3,14 @@ package indexbuilder
import (
"context"
"log"
"strconv"
"time"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/indexbuilderpb"
"github.com/zilliztech/milvus-distributed/internal/storage"
)
type task interface {
@ -55,6 +58,7 @@ type IndexAddTask struct {
indexID UniqueID
idAllocator *allocator.IDAllocator
buildQueue TaskQueue
kv kv.Base
}
func (it *IndexAddTask) SetID(ID UniqueID) {
@ -83,6 +87,7 @@ func (it *IndexAddTask) Execute() error {
t := newIndexBuildTask()
t.table = it.table
t.indexID = it.indexID
t.kv = it.kv
var cancel func()
t.ctx, cancel = context.WithTimeout(it.ctx, reqTimeoutInterval)
defer cancel()
@ -112,7 +117,10 @@ func NewIndexAddTask() *IndexAddTask {
type IndexBuildTask struct {
BaseTask
index Index
indexID UniqueID
kv kv.Base
savePaths []string
indexMeta *indexbuilderpb.IndexMeta
}
@ -141,12 +149,129 @@ func (it *IndexBuildTask) Execute() error {
if err != nil {
return err
}
time.Sleep(time.Second)
log.Println("Pretend to Execute for 1 second")
return nil
typeParams := make(map[string]string)
for _, kvPair := range it.indexMeta.Req.GetTypeParams() {
key, value := kvPair.GetKey(), kvPair.GetValue()
_, ok := typeParams[key]
if ok {
return errors.New("duplicated key in type params")
}
typeParams[key] = value
}
indexParams := make(map[string]string)
for _, kvPair := range it.indexMeta.Req.GetIndexParams() {
key, value := kvPair.GetKey(), kvPair.GetValue()
_, ok := indexParams[key]
if ok {
return errors.New("duplicated key in index params")
}
indexParams[key] = value
}
it.index, err = NewCIndex(typeParams, indexParams)
if err != nil {
return err
}
getKeyByPathNaive := func(path string) string {
// splitElements := strings.Split(path, "/")
// return splitElements[len(splitElements)-1]
return path
}
getValueByPath := func(path string) ([]byte, error) {
data, err := it.kv.Load(path)
if err != nil {
return nil, err
}
return []byte(data), nil
}
getBlobByPath := func(path string) (*Blob, error) {
value, err := getValueByPath(path)
if err != nil {
return nil, err
}
return &Blob{
Key: getKeyByPathNaive(path),
Value: value,
}, nil
}
getStorageBlobs := func(blobs []*Blob) []*storage.Blob {
// when storage.Blob.Key & storage.Blob.Value is visible,
// use `return blobs`
ret := make([]*storage.Blob, 0)
for _, blob := range blobs {
ret = append(ret, storage.NewBlob(blob.Key, blob.Value))
}
return ret
}
toLoadDataPaths := it.indexMeta.Req.GetDataPaths()
keys := make([]string, 0)
blobs := make([]*Blob, 0)
for _, path := range toLoadDataPaths {
keys = append(keys, getKeyByPathNaive(path))
blob, err := getBlobByPath(path)
if err != nil {
return err
}
blobs = append(blobs, blob)
}
storageBlobs := getStorageBlobs(blobs)
var insertCodec storage.InsertCodec
partitionID, segmentID, insertData, err := insertCodec.Deserialize(storageBlobs)
if len(insertData.Data) != 1 {
return errors.New("we expect only one field in deserialized insert data")
}
for _, value := range insertData.Data {
// TODO: BinaryVectorFieldData
floatVectorFieldData, ok := value.(*storage.FloatVectorFieldData)
if !ok {
return errors.New("we expect FloatVectorFieldData or BinaryVectorFieldData")
}
err = it.index.BuildFloatVecIndex(floatVectorFieldData.Data)
if err != nil {
return err
}
indexBlobs, err := it.index.Serialize()
if err != nil {
return err
}
var indexCodec storage.IndexCodec
serializedIndexBlobs, err := indexCodec.Serialize(getStorageBlobs(indexBlobs))
if err != nil {
return err
}
getSavePathByKey := func(key string) string {
// TODO: fix me, use more reasonable method
return strconv.Itoa(int(it.indexID)) + "/" + strconv.Itoa(int(partitionID)) + "/" + strconv.Itoa(int(segmentID)) + "/" + key
}
saveBlob := func(path string, value []byte) error {
return it.kv.Save(path, string(value))
}
it.savePaths = make([]string, 0)
for _, blob := range serializedIndexBlobs {
key, value := blob.GetKey(), blob.GetValue()
savePath := getSavePathByKey(key)
err := saveBlob(savePath, value)
if err != nil {
return err
}
it.savePaths = append(it.savePaths, savePath)
}
}
return it.index.Delete()
}
func (it *IndexBuildTask) PostExecute() error {
dataPaths := []string{"file1", "file2"}
return it.table.CompleteIndex(it.indexID, dataPaths)
return it.table.CompleteIndex(it.indexID, it.savePaths)
}

View File

@ -8,6 +8,7 @@ import (
"sync"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/kv"
)
type TaskQueue interface {
@ -169,6 +170,7 @@ type TaskScheduler struct {
idAllocator *allocator.IDAllocator
metaTable *metaTable
kv kv.Base
wg sync.WaitGroup
ctx context.Context
@ -177,11 +179,13 @@ type TaskScheduler struct {
func NewTaskScheduler(ctx context.Context,
idAllocator *allocator.IDAllocator,
kv kv.Base,
table *metaTable) (*TaskScheduler, error) {
ctx1, cancel := context.WithCancel(ctx)
s := &TaskScheduler{
idAllocator: idAllocator,
metaTable: table,
kv: kv,
ctx: ctx1,
cancel: cancel,
}

View File

@ -21,6 +21,18 @@ type Blob struct {
value []byte
}
func NewBlob(key string, value []byte) *Blob {
return &Blob{key, value}
}
func (b Blob) GetKey() string {
return b.key
}
func (b Blob) GetValue() []byte {
return b.value
}
type Base struct {
Version int
CommitID int