Add writenode to ci

Signed-off-by: XuanYang-cn <xuan.yang@zilliz.com>
pull/4973/head^2
XuanYang-cn 2021-01-05 15:14:59 +08:00 committed by yefu.chen
parent ca89ece576
commit 05f1350e19
8 changed files with 84 additions and 21 deletions

View File

@ -25,6 +25,11 @@ dir ('build/docker/deploy') {
sh 'docker pull ${SOURCE_REPO}/querynode:${SOURCE_TAG} || true'
sh 'docker-compose build --force-rm querynode'
sh 'docker-compose push querynode'
sh 'docker pull registry.zilliz.com/milvus-distributed/milvus-distributed-dev:latest || true'
sh 'docker pull ${SOURCE_REPO}/writenode:${SOURCE_TAG} || true'
sh 'docker-compose build --force-rm writenode'
sh 'docker-compose push writenode'
}
} catch (exc) {
throw exc

View File

@ -9,6 +9,7 @@ try {
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d proxy'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} run -e QUERY_NODE_ID=1 -d querynode'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} run -e QUERY_NODE_ID=2 -d querynode'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} run -e WRITE_NODE_ID=3 -d writenode'
}
dir ('build/docker/test') {

View File

@ -42,5 +42,20 @@ services:
networks:
- milvus
writenode:
image: ${TARGET_REPO}/writenode:${TARGET_TAG}
build:
context: ../../../
dockerfile: build/docker/deploy/writenode/DockerFile
cache_from:
- ${SOURCE_REPO}/writenode:${SOURCE_TAG}
environment:
PULSAR_ADDRESS: ${PULSAR_ADDRESS}
ETCD_ADDRESS: ${ETCD_ADDRESS}
MASTER_ADDRESS: ${MASTER_ADDRESS}
MINIO_ADDRESS: ${MINIO_ADDRESS}
networks:
- milvus
networks:
milvus:

View File

@ -0,0 +1,39 @@
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
FROM milvusdb/milvus-distributed-dev:amd64-ubuntu18.04-latest AS openblas
#FROM alpine
FROM ubuntu:bionic-20200921
RUN apt-get update && apt-get install -y --no-install-recommends libtbb-dev gfortran
#RUN echo "http://dl-cdn.alpinelinux.org/alpine/edge/testing" >> /etc/apk/repositories
#RUN sed -i "s/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g" /etc/apk/repositories \
# && apk add --no-cache libtbb gfortran
COPY --from=openblas /usr/lib/libopenblas-r0.3.9.so /usr/lib/
RUN ln -s /usr/lib/libopenblas-r0.3.9.so /usr/lib/libopenblas.so.0 && \
ln -s /usr/lib/libopenblas.so.0 /usr/lib/libopenblas.so
COPY ./bin/writenode /milvus-distributed/bin/writenode
COPY ./configs/ /milvus-distributed/configs/
COPY ./lib/ /milvus-distributed/lib/
ENV LD_LIBRARY_PATH=/milvus-distributed/lib:$LD_LIBRARY_PATH:/usr/lib
WORKDIR /milvus-distributed/
CMD ["./bin/writenode"]

View File

@ -88,6 +88,7 @@ func (it *IndexAddTask) Execute() error {
t.table = it.table
t.indexID = it.indexID
t.kv = it.kv
t.req = it.req
var cancel func()
t.ctx, cancel = context.WithTimeout(it.ctx, reqTimeoutInterval)
defer cancel()
@ -121,7 +122,7 @@ type IndexBuildTask struct {
indexID UniqueID
kv kv.Base
savePaths []string
indexMeta *indexbuilderpb.IndexMeta
req *indexbuilderpb.BuildIndexRequest
}
func newIndexBuildTask() *IndexBuildTask {
@ -151,7 +152,7 @@ func (it *IndexBuildTask) Execute() error {
}
typeParams := make(map[string]string)
for _, kvPair := range it.indexMeta.Req.GetTypeParams() {
for _, kvPair := range it.req.GetTypeParams() {
key, value := kvPair.GetKey(), kvPair.GetValue()
_, ok := typeParams[key]
if ok {
@ -161,7 +162,7 @@ func (it *IndexBuildTask) Execute() error {
}
indexParams := make(map[string]string)
for _, kvPair := range it.indexMeta.Req.GetIndexParams() {
for _, kvPair := range it.req.GetIndexParams() {
key, value := kvPair.GetKey(), kvPair.GetValue()
_, ok := indexParams[key]
if ok {
@ -201,7 +202,7 @@ func (it *IndexBuildTask) Execute() error {
return blobs
}
toLoadDataPaths := it.indexMeta.Req.GetDataPaths()
toLoadDataPaths := it.req.GetDataPaths()
keys := make([]string, 0)
blobs := make([]*Blob, 0)
for _, path := range toLoadDataPaths {

View File

@ -181,7 +181,7 @@ func CreateServer(ctx context.Context) (*Master, error) {
m.scheduler.SetDDMsgStream(pulsarDDStream)
m.scheduler.SetIDAllocator(func() (UniqueID, error) { return m.idAllocator.AllocOne() })
flushClient, err := writerclient.NewWriterClient(Params.EtcdAddress, kvRootPath, Params.WriteNodeSegKvSubPath, pulsarDDStream)
flushClient, err := writerclient.NewWriterClient(Params.EtcdAddress, Params.MetaRootPath, Params.WriteNodeSegKvSubPath, pulsarDDStream)
if err != nil {
return nil, err
}

View File

@ -310,32 +310,28 @@ func printDDLPayloadValues(eventType EventTypeCode, colType schemapb.DataType, r
switch eventType {
case CreateCollectionEventType:
var req internalpb.CreateCollectionRequest
if err := proto.Unmarshal(([]byte)(val), &req); err != nil {
if err := proto.UnmarshalText(val, &req); err != nil {
return err
}
msg := proto.MarshalTextString(&req)
fmt.Printf("\t\t%d : create collection: %s\n", i, msg)
fmt.Printf("\t\t%d : create collection: %v\n", i, req)
case DropCollectionEventType:
var req internalpb.DropPartitionRequest
if err := proto.Unmarshal(([]byte)(val), &req); err != nil {
var req internalpb.DropCollectionRequest
if err := proto.UnmarshalText(val, &req); err != nil {
return err
}
msg := proto.MarshalTextString(&req)
fmt.Printf("\t\t%d : drop collection: %s\n", i, msg)
fmt.Printf("\t\t%d : drop collection: %v\n", i, req)
case CreatePartitionEventType:
var req internalpb.CreatePartitionRequest
if err := proto.Unmarshal(([]byte)(val), &req); err != nil {
if err := proto.UnmarshalText(val, &req); err != nil {
return err
}
msg := proto.MarshalTextString(&req)
fmt.Printf("\t\t%d : create partition: %s\n", i, msg)
fmt.Printf("\t\t%d : create partition: %v\n", i, req)
case DropPartitionEventType:
var req internalpb.DropPartitionRequest
if err := proto.Unmarshal(([]byte)(val), &req); err != nil {
if err := proto.UnmarshalText(val, &req); err != nil {
return err
}
msg := proto.MarshalTextString(&req)
fmt.Printf("\t\t%d : drop partition: %s\n", i, msg)
fmt.Printf("\t\t%d : drop partition: %v\n", i, req)
default:
return errors.Errorf("undefined ddl event type %d", eventType)
}

View File

@ -542,15 +542,21 @@ func (ibNode *insertBufferNode) getMeta(segID UniqueID) (*etcdpb.SegmentMeta, *e
segMeta := &etcdpb.SegmentMeta{}
key := path.Join(SegmentPrefix, strconv.FormatInt(segID, 10))
value, _ := ibNode.kvClient.Load(key)
err := proto.UnmarshalText(value, segMeta)
value, err := ibNode.kvClient.Load(key)
if err != nil {
return nil, nil, err
}
err = proto.UnmarshalText(value, segMeta)
if err != nil {
return nil, nil, err
}
collMeta := &etcdpb.CollectionMeta{}
key = path.Join(CollectionPrefix, strconv.FormatInt(segMeta.GetCollectionID(), 10))
value, _ = ibNode.kvClient.Load(key)
value, err = ibNode.kvClient.Load(key)
if err != nil {
return nil, nil, err
}
err = proto.UnmarshalText(value, collMeta)
if err != nil {
return nil, nil, err