Impl minimum runable writenode and writenode flowgraph

Signed-off-by: XuanYang-cn <xuan.yang@zilliz.com>
pull/4973/head^2
XuanYang-cn 2020-12-09 20:07:27 +08:00 committed by yefu.chen
parent de12fa5a10
commit 881be7f3e3
31 changed files with 1964 additions and 31 deletions

View File

@ -1,6 +1,6 @@
{
"name": "Milvus Distributed Dev Container Definition",
"image": "milvusdb/milvus-distributed-dev:amd64-ubuntu18.04-20201120-092740",
"image": "milvusdb/milvus-distributed-dev:amd64-ubuntu18.04-20201209-104246",
"runArgs": [ "--cap-add=SYS_PTRACE", "--security-opt", "seccomp=unconfined" ],
"workspaceFolder": "/go/src/github.com/zilliztech/milvus-distributed",
"workspaceMount": "source=${localWorkspaceFolder},target=/go/src/github.com/zilliztech/milvus-distributed,type=bind,consistency=cached",

2
.env
View File

@ -1,7 +1,7 @@
REPO=milvusdb/milvus-distributed-dev
ARCH=amd64
UBUNTU=18.04
DATE_VERSION=20201202-085131
DATE_VERSION=20201209-104246
LATEST_DATE_VERSION=latest
MINIO_ADDRESS=minio:9000
PULSAR_ADDRESS=pulsar://pulsar:6650

View File

@ -42,12 +42,18 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Cache Docker Volumes
- name: Cache CCache Volumes
uses: actions/cache@v1
with:
path: .docker
key: ubuntu${{ matrix.ubuntu }}-${{ hashFiles('internal/core/**') }}
restore-keys: ubuntu${{ matrix.ubuntu }}-
path: .docker/amd64-ubuntu${{ matrix.ubuntu }}-ccache
key: ubuntu${{ matrix.ubuntu }}-ccache-${{ hashFiles('internal/core/**') }}
restore-keys: ubuntu${{ matrix.ubuntu }}-ccache-
- name: Cache Go Mod Volumes
uses: actions/cache@v1
with:
path: .docker/amd64-ubuntu${{ matrix.ubuntu }}-go-mod
key: ubuntu${{ matrix.ubuntu }}-go-mod-${{ hashFiles('**/go.sum') }}
restore-keys: ubuntu${{ matrix.ubuntu }}-go-mod-
- name: Dockerfile Lint
uses: reviewdog/action-hadolint@v1
with:
@ -55,5 +61,7 @@ jobs:
reporter: github-pr-check # Default is github-pr-check
hadolint_ignore: DL3008
- name: Code Check
env:
CHECK_BUILDER: "1"
run: |
./build/builder.sh /bin/bash -c "make check-proto-product && make verifiers"

View File

@ -42,12 +42,18 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Cache Docker Volumes
- name: Cache CCache Volumes
uses: actions/cache@v1
with:
path: .docker
key: ubuntu${{ matrix.ubuntu }}-${{ hashFiles('internal/core/**') }}
restore-keys: ubuntu${{ matrix.ubuntu }}-
path: .docker/amd64-ubuntu${{ matrix.ubuntu }}-ccache
key: ubuntu${{ matrix.ubuntu }}-ccache-${{ hashFiles('internal/core/**') }}
restore-keys: ubuntu${{ matrix.ubuntu }}-ccache-
- name: Cache Go Mod Volumes
uses: actions/cache@v1
with:
path: .docker/amd64-ubuntu${{ matrix.ubuntu }}-go-mod
key: ubuntu${{ matrix.ubuntu }}-go-mod-${{ hashFiles('**/go.sum') }}
restore-keys: ubuntu${{ matrix.ubuntu }}-go-mod-
- name: Start Service
shell: bash
run: |

View File

@ -1,7 +1,8 @@
timeout(time: 10, unit: 'MINUTES') {
timeout(time: 20, unit: 'MINUTES') {
dir ("scripts") {
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $CCACHE_ARTFACTORY_URL --cache_dir=\$CCACHE_DIR -f ccache-\$OS_NAME-\$BUILD_ENV_IMAGE_ID.tar.gz || echo \"ccache artfactory files not found!\"'
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $THIRDPARTY_ARTFACTORY_URL --cache_dir=$CUSTOM_THIRDPARTY_PATH -f thirdparty-download.tar.gz || echo \"thirdparty artfactory files not found!\"'
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $CCACHE_ARTFACTORY_URL --cache_dir=\$CCACHE_DIR -f ccache-\$OS_NAME-\$BUILD_ENV_IMAGE_ID.tar.gz || echo \"Ccache artfactory files not found!\"'
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $THIRDPARTY_ARTFACTORY_URL --cache_dir=$CUSTOM_THIRDPARTY_PATH -f thirdparty-download.tar.gz || echo \"Thirdparty artfactory files not found!\"'
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $GO_MOD_ARTFACTORY_URL --cache_dir=\$GOPATH/pkg/mod -f milvus-distributed-go-mod-cache.tar.gz || echo \"Go mod artfactory files not found!\"'
}
sh '. ./scripts/before-install.sh && make install'
@ -10,6 +11,7 @@ timeout(time: 10, unit: 'MINUTES') {
withCredentials([usernamePassword(credentialsId: "${env.JFROG_CREDENTIALS_ID}", usernameVariable: 'USERNAME', passwordVariable: 'PASSWORD')]) {
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./update_cache.sh -l $CCACHE_ARTFACTORY_URL --cache_dir=\$CCACHE_DIR -f ccache-\$OS_NAME-\$BUILD_ENV_IMAGE_ID.tar.gz -u ${USERNAME} -p ${PASSWORD}'
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./update_cache.sh -l $THIRDPARTY_ARTFACTORY_URL --cache_dir=$CUSTOM_THIRDPARTY_PATH -f thirdparty-download.tar.gz -u ${USERNAME} -p ${PASSWORD}'
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./update_cache.sh -l $GO_MOD_ARTFACTORY_URL --cache_dir=\$GOPATH/pkg/mod -f milvus-distributed-go-mod-cache.tar.gz -u ${USERNAME} -p ${PASSWORD}'
}
}
}

View File

@ -21,11 +21,18 @@ all: build-cpp build-go
get-build-deps:
@(env bash $(PWD)/scripts/install_deps.sh)
getdeps:
@mkdir -p ${GOPATH}/bin
@which golangci-lint 1>/dev/null || (echo "Installing golangci-lint" && curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOPATH)/bin v1.27.0)
@which ruleguard 1>/dev/null || (echo "Installing ruleguard" && GO111MODULE=off go get github.com/quasilyte/go-ruleguard/...)
cppcheck:
@(env bash ${PWD}/scripts/core_build.sh -l)
generated-proto-go:export protoc:=${PWD}/cmake_build/thirdparty/protobuf/protobuf-build/protoc
generated-proto-go: build-cpp
@mkdir -p ${GOPATH}/bin
@which protoc-gen-go 1>/dev/null || (echo "Installing protoc-gen-go" && go get github.com/golang/protobuf/protoc-gen-go@v1.3.2)
@(env bash $(PWD)/scripts/proto_gen_go.sh)
check-proto-product: generated-proto-go
@ -51,7 +58,7 @@ ruleguard:
@${GOPATH}/bin/ruleguard -rules ruleguard.rules.go ./cmd/...
@${GOPATH}/bin/ruleguard -rules ruleguard.rules.go ./tests/go/...
verifiers: cppcheck fmt lint ruleguard
verifiers: getdeps cppcheck fmt lint ruleguard
# Builds various components locally.
build-go:

View File

@ -14,7 +14,6 @@ if [ "${1-}" = "pull" ]; then
fi
if [ "${1-}" = "gdbserver" ]; then
mkdir -p "${DOCKER_VOLUME_DIRECTORY:-.docker}/amd64-ubuntu18.04-gdbserver-cache"
mkdir -p "${DOCKER_VOLUME_DIRECTORY:-.docker}/amd64-ubuntu18.04-gdbserver-home"
chmod -R 777 "${DOCKER_VOLUME_DIRECTORY:-.docker}"
@ -45,7 +44,8 @@ gid=$(id -g)
[ "$uid" -lt 500 ] && uid=501
[ "$gid" -lt 500 ] && gid=$uid
mkdir -p "${DOCKER_VOLUME_DIRECTORY:-.docker}/amd64-ubuntu18.04-cache"
mkdir -p "${DOCKER_VOLUME_DIRECTORY:-.docker}/amd64-ubuntu18.04-ccache"
mkdir -p "${DOCKER_VOLUME_DIRECTORY:-.docker}/amd64-ubuntu18.04-go-mod"
chmod -R 777 "${DOCKER_VOLUME_DIRECTORY:-.docker}"
docker-compose pull --ignore-pull-failures ubuntu

View File

@ -40,6 +40,7 @@ pipeline {
CCACHE_ARTFACTORY_URL = "${JFROG_ARTFACTORY_URL}/milvus-distributed/ccache"
THIRDPARTY_ARTFACTORY_URL = "${JFROG_ARTFACTORY_URL}/milvus-distributed/thirdparty"
CUSTOM_THIRDPARTY_PATH = "${WORKSPACE}/3rdparty_download"
GO_MOD_ARTFACTORY_URL = "${JFROG_ARTFACTORY_URL}/milvus-distributed/go-mod"
}
steps {
container('build-env') {

View File

@ -11,12 +11,12 @@ spec:
runAsGroup: 2000
containers:
- name: build-env
image: milvusdb/milvus-distributed-dev:amd64-ubuntu18.04-20201124-101232
image: milvusdb/milvus-distributed-dev:amd64-ubuntu18.04-20201209-104246
env:
- name: OS_NAME
value: "ubuntu18.04"
- name: BUILD_ENV_IMAGE_ID
value: "f0f52760fde8758793f5a68c39ba20298c812e754de337ba4cc7fd8edf4ae7a9"
value: "ae09110abf11fc58e2bcd2393c4cd071f0c5cfe2b65ab1a75f9510f3b32921bc"
command:
- cat
tty: true

View File

@ -38,8 +38,7 @@ ENV GOROOT /usr/local/go
ENV GO111MODULE on
ENV PATH $GOPATH/bin:$GOROOT/bin:$PATH
RUN mkdir -p /usr/local/go && wget -qO- "https://golang.org/dl/go1.15.2.linux-amd64.tar.gz" | tar --strip-components=1 -xz -C /usr/local/go && \
mkdir -p "$GOPATH/src" "$GOPATH/src/github.com/zilliztech" "$GOPATH/bin" && \
go get github.com/golang/protobuf/protoc-gen-go@v1.3.2 && \
mkdir -p "$GOPATH/src" "$GOPATH/bin" && \
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b ${GOPATH}/bin v1.27.0 && \
export GO111MODULE=off && go get github.com/quasilyte/go-ruleguard/... && \
chmod -R 777 "$GOPATH" && chmod -R a+w $(go env GOTOOLDIR)

View File

@ -37,4 +37,4 @@ msgChannel:
dataDefinition: [0,1]
k2s: [0, 1]
search: [0, 1]
searchResult: [0, 1]
searchResult: [0, 1]

View File

@ -0,0 +1,41 @@
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
writeNode:
stats:
publishInterval: 1000 # milliseconds
dataSync:
flowGraph:
maxQueueLength: 1024
maxParallelism: 1024
msgStream:
insert:
#streamBufSize: 1024 # msgPack chan buffer size
recvBufSize: 1024 # msgPack chan buffer size
pulsarBufSize: 1024 # pulsar chan buffer size
delete:
#streamBufSize: 1024 # msgPack chan buffer size
recvBufSize: 1024 # msgPack chan buffer size
pulsarBufSize: 1024 # pulsar chan buffer size
search:
recvBufSize: 512
pulsarBufSize: 512
searchResult:
recvBufSize: 64
stats:
recvBufSize: 64

View File

@ -24,7 +24,8 @@ services:
MINIO_ADDRESS: ${MINIO_ADDRESS}
volumes: &ubuntu-volumes
- .:/go/src/github.com/zilliztech/milvus-distributed:delegated
- ${DOCKER_VOLUME_DIRECTORY:-.docker}/${ARCH}-ubuntu${UBUNTU}-cache:/ccache:delegated
- ${DOCKER_VOLUME_DIRECTORY:-.docker}/${ARCH}-ubuntu${UBUNTU}-ccache:/ccache:delegated
- ${DOCKER_VOLUME_DIRECTORY:-.docker}/${ARCH}-ubuntu${UBUNTU}-go-mod:/go/pkg/mod:delegated
working_dir: "/go/src/github.com/zilliztech/milvus-distributed"
command: &ubuntu-command >
/bin/bash -c "

View File

@ -28,14 +28,8 @@ func (reader *BinlogReader) NextEventReader() (*EventReader, error) {
return nil, err
}
reader.currentEventReader = nil
if reader.currentOffset >= int32(reader.buffer.Len()) {
return nil, nil
}
// skip remaining bytes of this event
remaining := int(reader.currentOffset) - (reader.bufferLength - reader.buffer.Len())
reader.buffer.Next(remaining)
}
if reader.currentOffset >= int32(reader.buffer.Len()) {
if reader.buffer.Len() <= 0 {
return nil, nil
}
eventReader, err := newEventReader(reader.descriptorEvent.PayloadDataType, reader.buffer)

View File

@ -37,6 +37,8 @@ macro( build_arrow )
"-DARROW_BUILD_UTILITIES=OFF"
"-DARROW_PARQUET=ON"
"-DPARQUET_BUILD_SHARED=OFF"
"-DThrift_SOURCE=BUNDLED"
"-Dutf8proc_SOURCE=BUNDLED"
"-DARROW_S3=OFF"
"-DCMAKE_VERBOSE_MAKEFILE=ON"
"-DCMAKE_INSTALL_PREFIX=${CMAKE_CURRENT_BINARY_DIR}"

View File

@ -0,0 +1,684 @@
package storage
import (
"fmt"
"strconv"
"strings"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
const (
TsField int = 1
RequestField int = 100
)
type (
UniqueID = typeutil.UniqueID
Timestamp = typeutil.Timestamp
)
type Blob struct {
key string
value []byte
}
type Base struct {
Version int
CommitID int
TanentID UniqueID
Schema *etcdpb.CollectionMeta
}
type FieldData interface{}
type BoolFieldData struct {
NumRows int
data []bool
}
type Int8FieldData struct {
NumRows int
data []int8
}
type Int16FieldData struct {
NumRows int
data []int16
}
type Int32FieldData struct {
NumRows int
data []int32
}
type Int64FieldData struct {
NumRows int
data []int64
}
type FloatFieldData struct {
NumRows int
data []float32
}
type DoubleFieldData struct {
NumRows int
data []float64
}
type StringFieldData struct {
NumRows int
data []string
}
type BinaryVectorFieldData struct {
NumRows int
data []byte
dim int
}
type FloatVectorFieldData struct {
NumRows int
data []float32
dim int
}
// TODO: more types of FieldData
// system filed id:
// 0: unique row id
// 1: timestamp
// 100: first user field id
// 101: second user field id
// 102: ...
// example row_schema: {float_field, int_field, float_vector_field, string_field}
// Data {<0, row_id>, <1, timestamp>, <100, float_field>, <101, int_field>, <102, float_vector_field>, <103, string_field>}
type InsertData struct {
Data map[int]FieldData // field id to field data
}
// Blob key example:
// ${tanent}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx}
type InsertCodec struct {
Base
readerCloseFunc []func() error
}
func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segmentID UniqueID, data *InsertData, ts []Timestamp) ([]*Blob, error) {
var blobs []*Blob
var writer *InsertBinlogWriter
var err error
for fieldID, value := range data.Data {
switch singleData := value.(type) {
case BoolFieldData:
writer, err = NewInsertBinlogWriter(schemapb.DataType_BOOL)
if err != nil {
return nil, err
}
eventWriter, err := writer.NextInsertEventWriter()
if err != nil {
return nil, err
}
eventWriter.SetStartTimestamp(ts[0])
eventWriter.SetStartTimestamp(ts[len(ts)-1])
err = eventWriter.AddBoolToPayload(singleData.data)
if err != nil {
return nil, err
}
case Int8FieldData:
writer, err = NewInsertBinlogWriter(schemapb.DataType_INT8)
if err != nil {
return nil, err
}
eventWriter, err := writer.NextInsertEventWriter()
if err != nil {
return nil, err
}
eventWriter.SetStartTimestamp(ts[0])
eventWriter.SetStartTimestamp(ts[len(ts)-1])
err = eventWriter.AddInt8ToPayload(singleData.data)
if err != nil {
return nil, err
}
case Int16FieldData:
writer, err = NewInsertBinlogWriter(schemapb.DataType_INT16)
if err != nil {
return nil, err
}
eventWriter, err := writer.NextInsertEventWriter()
if err != nil {
return nil, err
}
eventWriter.SetStartTimestamp(ts[0])
eventWriter.SetStartTimestamp(ts[len(ts)-1])
err = eventWriter.AddInt16ToPayload(singleData.data)
if err != nil {
return nil, err
}
case Int32FieldData:
writer, err = NewInsertBinlogWriter(schemapb.DataType_INT32)
if err != nil {
return nil, err
}
eventWriter, err := writer.NextInsertEventWriter()
if err != nil {
return nil, err
}
eventWriter.SetStartTimestamp(ts[0])
eventWriter.SetStartTimestamp(ts[len(ts)-1])
err = eventWriter.AddInt32ToPayload(singleData.data)
if err != nil {
return nil, err
}
case Int64FieldData:
writer, err = NewInsertBinlogWriter(schemapb.DataType_INT64)
if err != nil {
return nil, err
}
eventWriter, err := writer.NextInsertEventWriter()
if err != nil {
return nil, err
}
eventWriter.SetStartTimestamp(ts[0])
eventWriter.SetStartTimestamp(ts[len(ts)-1])
err = eventWriter.AddInt64ToPayload(singleData.data)
if err != nil {
return nil, err
}
case FloatFieldData:
writer, err = NewInsertBinlogWriter(schemapb.DataType_FLOAT)
if err != nil {
return nil, err
}
eventWriter, err := writer.NextInsertEventWriter()
if err != nil {
return nil, err
}
eventWriter.SetStartTimestamp(ts[0])
eventWriter.SetStartTimestamp(ts[len(ts)-1])
err = eventWriter.AddFloatToPayload(singleData.data)
if err != nil {
return nil, err
}
case DoubleFieldData:
writer, err = NewInsertBinlogWriter(schemapb.DataType_DOUBLE)
if err != nil {
return nil, err
}
eventWriter, err := writer.NextInsertEventWriter()
if err != nil {
return nil, err
}
eventWriter.SetStartTimestamp(ts[0])
eventWriter.SetStartTimestamp(ts[len(ts)-1])
err = eventWriter.AddDoubleToPayload(singleData.data)
if err != nil {
return nil, err
}
case StringFieldData:
writer, err = NewInsertBinlogWriter(schemapb.DataType_STRING)
if err != nil {
return nil, err
}
eventWriter, err := writer.NextInsertEventWriter()
if err != nil {
return nil, err
}
eventWriter.SetStartTimestamp(ts[0])
eventWriter.SetStartTimestamp(ts[len(ts)-1])
for _, singleString := range singleData.data {
err = eventWriter.AddOneStringToPayload(singleString)
}
if err != nil {
return nil, err
}
case BinaryVectorFieldData:
writer, err = NewInsertBinlogWriter(schemapb.DataType_VECTOR_BINARY)
if err != nil {
return nil, err
}
eventWriter, err := writer.NextInsertEventWriter()
if err != nil {
return nil, err
}
eventWriter.SetStartTimestamp(ts[0])
eventWriter.SetStartTimestamp(ts[len(ts)-1])
err = eventWriter.AddBinaryVectorToPayload(singleData.data, singleData.dim)
if err != nil {
return nil, err
}
case FloatVectorFieldData:
writer, err = NewInsertBinlogWriter(schemapb.DataType_VECTOR_FLOAT)
if err != nil {
return nil, err
}
eventWriter, err := writer.NextInsertEventWriter()
if err != nil {
return nil, err
}
eventWriter.SetStartTimestamp(ts[0])
eventWriter.SetStartTimestamp(ts[len(ts)-1])
err = eventWriter.AddFloatVectorToPayload(singleData.data, singleData.dim)
if err != nil {
return nil, err
}
}
if writer == nil {
return nil, fmt.Errorf("binlog writer is nil")
}
writer.CollectionID = insertCodec.Schema.ID
writer.PartitionID = partitionID
writer.SegmentID = segmentID
writer.SetStartTimeStamp(ts[0])
writer.SetEndTimeStamp(ts[len(ts)-1])
err := writer.Close()
if err != nil {
return nil, err
}
buffer := writer.GetBuffer()
blobKey := fmt.Sprintf("%d/insert_log/%d/%d/%d/%d/%d",
insertCodec.TanentID, insertCodec.Schema.ID, partitionID, segmentID, fieldID, logIdx)
blobs = append(blobs, &Blob{
key: blobKey,
value: buffer,
})
}
return blobs, nil
}
func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *InsertData, err error) {
if len(blobs) == 0 {
return -1, -1, nil, fmt.Errorf("blobs is empty")
}
readerClose := func(reader *BinlogReader) func() error {
return func() error { return reader.Close() }
}
pID, _ := strconv.ParseInt(strings.Split(blobs[0].key, "/")[3], 0, 10)
sID, _ := strconv.ParseInt(strings.Split(blobs[0].key, "/")[4], 0, 10)
var resultData InsertData
resultData.Data = make(map[int]FieldData)
for _, blob := range blobs {
fieldID, err := strconv.Atoi(strings.Split(blob.key, "/")[5])
if err != nil {
return -1, -1, nil, err
}
dataType := insertCodec.Schema.Schema.Fields[fieldID].GetDataType()
switch dataType {
case schemapb.DataType_BOOL:
binlogReader, err := NewBinlogReader(blob.value)
if err != nil {
return -1, -1, nil, err
}
var boolFieldData BoolFieldData
eventReader, err := binlogReader.NextEventReader()
if err != nil {
return -1, -1, nil, err
}
boolFieldData.data, err = eventReader.GetBoolFromPayload()
if err != nil {
return -1, -1, nil, err
}
boolFieldData.NumRows = len(boolFieldData.data)
resultData.Data[fieldID] = boolFieldData
insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
case schemapb.DataType_INT8:
binlogReader, err := NewBinlogReader(blob.value)
if err != nil {
return -1, -1, nil, err
}
var int8FieldData Int8FieldData
eventReader, err := binlogReader.NextEventReader()
if err != nil {
return -1, -1, nil, err
}
int8FieldData.data, err = eventReader.GetInt8FromPayload()
if err != nil {
return -1, -1, nil, err
}
int8FieldData.NumRows = len(int8FieldData.data)
resultData.Data[fieldID] = int8FieldData
insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
case schemapb.DataType_INT16:
binlogReader, err := NewBinlogReader(blob.value)
if err != nil {
return -1, -1, nil, err
}
var int16FieldData Int16FieldData
eventReader, err := binlogReader.NextEventReader()
if err != nil {
return -1, -1, nil, err
}
int16FieldData.data, err = eventReader.GetInt16FromPayload()
if err != nil {
return -1, -1, nil, err
}
int16FieldData.NumRows = len(int16FieldData.data)
resultData.Data[fieldID] = int16FieldData
insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
case schemapb.DataType_INT32:
binlogReader, err := NewBinlogReader(blob.value)
if err != nil {
return -1, -1, nil, err
}
var int32FieldData Int32FieldData
eventReader, err := binlogReader.NextEventReader()
if err != nil {
return -1, -1, nil, err
}
int32FieldData.data, err = eventReader.GetInt32FromPayload()
if err != nil {
return -1, -1, nil, err
}
int32FieldData.NumRows = len(int32FieldData.data)
resultData.Data[fieldID] = int32FieldData
insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
case schemapb.DataType_INT64:
binlogReader, err := NewBinlogReader(blob.value)
if err != nil {
return -1, -1, nil, err
}
var int64FieldData Int64FieldData
eventReader, err := binlogReader.NextEventReader()
if err != nil {
return -1, -1, nil, err
}
int64FieldData.data, err = eventReader.GetInt64FromPayload()
if err != nil {
return -1, -1, nil, err
}
int64FieldData.NumRows = len(int64FieldData.data)
resultData.Data[fieldID] = int64FieldData
insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
case schemapb.DataType_FLOAT:
binlogReader, err := NewBinlogReader(blob.value)
if err != nil {
return -1, -1, nil, err
}
var floatFieldData FloatFieldData
eventReader, err := binlogReader.NextEventReader()
if err != nil {
return -1, -1, nil, err
}
floatFieldData.data, err = eventReader.GetFloatFromPayload()
if err != nil {
return -1, -1, nil, err
}
floatFieldData.NumRows = len(floatFieldData.data)
resultData.Data[fieldID] = floatFieldData
insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
case schemapb.DataType_DOUBLE:
binlogReader, err := NewBinlogReader(blob.value)
if err != nil {
return -1, -1, nil, err
}
var doubleFieldData DoubleFieldData
eventReader, err := binlogReader.NextEventReader()
if err != nil {
return -1, -1, nil, err
}
doubleFieldData.data, err = eventReader.GetDoubleFromPayload()
if err != nil {
return -1, -1, nil, err
}
doubleFieldData.NumRows = len(doubleFieldData.data)
resultData.Data[fieldID] = doubleFieldData
insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
case schemapb.DataType_STRING:
binlogReader, err := NewBinlogReader(blob.value)
if err != nil {
return -1, -1, nil, err
}
var stringFieldData StringFieldData
eventReader, err := binlogReader.NextEventReader()
if err != nil {
return -1, -1, nil, err
}
length, err := eventReader.GetPayloadLengthFromReader()
stringFieldData.NumRows = length
if err != nil {
return -1, -1, nil, err
}
for i := 0; i < length; i++ {
singleString, err := eventReader.GetOneStringFromPayload(i)
if err != nil {
return -1, -1, nil, err
}
stringFieldData.data = append(stringFieldData.data, singleString)
}
resultData.Data[fieldID] = stringFieldData
insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
case schemapb.DataType_VECTOR_BINARY:
binlogReader, err := NewBinlogReader(blob.value)
if err != nil {
return -1, -1, nil, err
}
var binaryVectorFieldData BinaryVectorFieldData
eventReader, err := binlogReader.NextEventReader()
if err != nil {
return -1, -1, nil, err
}
binaryVectorFieldData.data, binaryVectorFieldData.dim, err = eventReader.GetBinaryVectorFromPayload()
if err != nil {
return -1, -1, nil, err
}
binaryVectorFieldData.NumRows = len(binaryVectorFieldData.data)
resultData.Data[fieldID] = binaryVectorFieldData
insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
case schemapb.DataType_VECTOR_FLOAT:
binlogReader, err := NewBinlogReader(blob.value)
if err != nil {
return -1, -1, nil, err
}
var floatVectorFieldData FloatVectorFieldData
eventReader, err := binlogReader.NextEventReader()
if err != nil {
return -1, -1, nil, err
}
floatVectorFieldData.data, floatVectorFieldData.dim, err = eventReader.GetFloatVectorFromPayload()
if err != nil {
return -1, -1, nil, err
}
floatVectorFieldData.NumRows = len(floatVectorFieldData.data) / 8
resultData.Data[fieldID] = floatVectorFieldData
insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
}
}
return pID, sID, &resultData, nil
}
func (insertCodec *InsertCodec) Close() error {
for _, closeFunc := range insertCodec.readerCloseFunc {
err := closeFunc()
if err != nil {
return err
}
}
return nil
}
// Blob key example:
// ${tanent}/data_definition_log/${collection_id}/${field_type}/${log_idx}
type DataDefinitionCodec struct {
Base
readerCloseFunc []func() error
}
func (dataDefinitionCodec DataDefinitionCodec) Serialize(logIdx int, ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) {
writer, err := NewDDLBinlogWriter(schemapb.DataType_STRING)
if err != nil {
return nil, err
}
var blobs []*Blob
for pos, req := range ddRequests {
switch eventTypes[pos] {
case CreateCollectionEventType:
eventWriter, err := writer.NextCreateCollectionEventWriter()
if err != nil {
return nil, err
}
err = eventWriter.AddOneStringToPayload(req)
if err != nil {
return nil, err
}
eventWriter.SetStartTimestamp(ts[pos])
eventWriter.SetEndTimestamp(ts[pos])
case DropCollectionEventType:
eventWriter, err := writer.NextDropCollectionEventWriter()
if err != nil {
return nil, err
}
err = eventWriter.AddOneStringToPayload(req)
eventWriter.SetStartTimestamp(ts[pos])
eventWriter.SetEndTimestamp(ts[pos])
if err != nil {
return nil, err
}
case CreatePartitionEventType:
eventWriter, err := writer.NextCreatePartitionEventWriter()
if err != nil {
return nil, err
}
err = eventWriter.AddOneStringToPayload(req)
eventWriter.SetStartTimestamp(ts[pos])
eventWriter.SetEndTimestamp(ts[pos])
if err != nil {
return nil, err
}
case DropPartitionEventType:
eventWriter, err := writer.NextDropPartitionEventWriter()
if err != nil {
return nil, err
}
err = eventWriter.AddOneStringToPayload(req)
eventWriter.SetStartTimestamp(ts[pos])
eventWriter.SetEndTimestamp(ts[pos])
if err != nil {
return nil, err
}
}
}
err = writer.Close()
if err != nil {
return nil, err
}
buffer := writer.GetBuffer()
blobKey := fmt.Sprintf("%d/data_definition_log/%d/%d/%d",
dataDefinitionCodec.TanentID, dataDefinitionCodec.Schema.ID, RequestField, logIdx)
blobs = append(blobs, &Blob{
key: blobKey,
value: buffer,
})
writer, err = NewDDLBinlogWriter(schemapb.DataType_INT64)
if err != nil {
return nil, err
}
eventWriter, err := writer.NextCreateCollectionEventWriter()
if err != nil {
return nil, err
}
var int64Ts []int64
for _, singleTs := range ts {
int64Ts = append(int64Ts, int64(singleTs))
}
err = eventWriter.AddInt64ToPayload(int64Ts)
if err != nil {
return nil, err
}
err = writer.Close()
if err != nil {
return nil, err
}
buffer = writer.GetBuffer()
blobKey = fmt.Sprintf("%d/data_definition_log/%d/%d/%d",
dataDefinitionCodec.TanentID, dataDefinitionCodec.Schema.ID, TsField, logIdx)
blobs = append(blobs, &Blob{
key: blobKey,
value: buffer,
})
return blobs, nil
}
func (dataDefinitionCodec DataDefinitionCodec) Deserialize(blobs []*Blob) (ts []Timestamp, ddRequests []string, err error) {
if len(blobs) == 0 {
return nil, nil, fmt.Errorf("blobs is empty")
}
readerClose := func(reader *BinlogReader) func() error {
return func() error { return reader.Close() }
}
var requestsStrings []string
var resultTs []Timestamp
for _, blob := range blobs {
fieldID, err := strconv.Atoi(strings.Split(blob.key, "/")[3])
if err != nil {
return nil, nil, err
}
switch fieldID {
case TsField:
binlogReader, err := NewBinlogReader(blob.value)
if err != nil {
return nil, nil, err
}
eventReader, err := binlogReader.NextEventReader()
if err != nil {
return nil, nil, err
}
int64Ts, err := eventReader.GetInt64FromPayload()
if err != nil {
return nil, nil, err
}
for _, singleTs := range int64Ts {
resultTs = append(resultTs, Timestamp(singleTs))
}
dataDefinitionCodec.readerCloseFunc = append(dataDefinitionCodec.readerCloseFunc, readerClose(binlogReader))
case RequestField:
binlogReader, err := NewBinlogReader(blob.value)
if err != nil {
return nil, nil, err
}
eventReader, err := binlogReader.NextEventReader()
if err != nil {
return nil, nil, err
}
for eventReader != nil {
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return nil, nil, err
}
for i := 0; i < length; i++ {
singleString, err := eventReader.GetOneStringFromPayload(i)
if err != nil {
return nil, nil, err
}
requestsStrings = append(requestsStrings, singleString)
}
eventReader, err = binlogReader.NextEventReader()
if err != nil {
return nil, nil, err
}
}
dataDefinitionCodec.readerCloseFunc = append(dataDefinitionCodec.readerCloseFunc, readerClose(binlogReader))
}
}
return resultTs, requestsStrings, nil
}
func (dataDefinitionCodec *DataDefinitionCodec) Close() error {
for _, closeFunc := range dataDefinitionCodec.readerCloseFunc {
err := closeFunc()
if err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,227 @@
package storage
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
func TestInsertCodecWriter(t *testing.T) {
base := Base{
Version: 1,
CommitID: 1,
TanentID: 1,
Schema: &etcdpb.CollectionMeta{
ID: 1,
CreateTime: 1,
SegmentIDs: []int64{0, 1},
PartitionTags: []string{"partition_0", "partition_1"},
Schema: &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
AutoID: true,
Fields: []*schemapb.FieldSchema{
{
Name: "field_bool",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_BOOL,
},
{
Name: "field_int8",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_INT8,
},
{
Name: "field_int16",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_INT16,
},
{
Name: "field_int32",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_INT32,
},
{
Name: "field_int64",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_INT64,
},
{
Name: "field_float",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_FLOAT,
},
{
Name: "field_double",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_DOUBLE,
},
{
Name: "field_string",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_STRING,
},
{
Name: "field_binary_vector",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_VECTOR_BINARY,
},
{
Name: "field_float_vector",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_VECTOR_FLOAT,
},
},
},
},
}
insertCodec := &InsertCodec{
base,
make([]func() error, 0),
}
insertData := &InsertData{
Data: map[int]FieldData{
0: BoolFieldData{
NumRows: 2,
data: []bool{true, false},
},
1: Int8FieldData{
NumRows: 2,
data: []int8{1, 2},
},
2: Int16FieldData{
NumRows: 2,
data: []int16{1, 2},
},
3: Int32FieldData{
NumRows: 2,
data: []int32{1, 2},
},
4: Int64FieldData{
NumRows: 2,
data: []int64{1, 2},
},
5: FloatFieldData{
NumRows: 2,
data: []float32{1, 2},
},
6: DoubleFieldData{
NumRows: 2,
data: []float64{1, 2},
},
7: StringFieldData{
NumRows: 2,
data: []string{"1", "2"},
},
8: BinaryVectorFieldData{
NumRows: 8,
data: []byte{0, 255, 0, 1, 0, 1, 0, 1},
dim: 8,
},
9: FloatVectorFieldData{
NumRows: 1,
data: []float32{0, 1, 2, 3, 4, 5, 6, 7},
dim: 8,
},
},
}
blobs, err := insertCodec.Serialize(1, 1, 1, insertData, []Timestamp{0, 1})
assert.Nil(t, err)
partitionID, segmentID, resultData, err := insertCodec.Deserialize(blobs)
assert.Nil(t, err)
assert.Equal(t, partitionID, int64(1))
assert.Equal(t, segmentID, int64(1))
assert.Equal(t, insertData, resultData)
assert.Nil(t, insertCodec.Close())
}
func TestDDCodecWriter(t *testing.T) {
base := Base{
Version: 1,
CommitID: 1,
TanentID: 1,
Schema: &etcdpb.CollectionMeta{
ID: 1,
CreateTime: 1,
SegmentIDs: []int64{0, 1},
PartitionTags: []string{"partition_0", "partition_1"},
Schema: &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
AutoID: true,
Fields: []*schemapb.FieldSchema{
{
Name: "field_1",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_INT32,
},
{
Name: "field_2",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_INT64,
},
{
Name: "field_3",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_STRING,
},
{
Name: "field_3",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_STRING,
},
{
Name: "field_3",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_STRING,
},
},
},
},
}
dataDefinitionCodec := &DataDefinitionCodec{
base,
make([]func() error, 0),
}
ts := []Timestamp{
1,
2,
3,
4,
}
ddRequests := []string{
"CreateCollection",
"DropCollection",
"CreatePartition",
"DropPartition",
}
eventTypeCodes := []EventTypeCode{
CreateCollectionEventType,
DropCollectionEventType,
CreatePartitionEventType,
DropPartitionEventType,
}
blobs, err := dataDefinitionCodec.Serialize(1, ts, ddRequests, eventTypeCodes)
assert.Nil(t, err)
resultTs, resultRequests, err := dataDefinitionCodec.Deserialize(blobs)
assert.Nil(t, err)
assert.Equal(t, resultTs, ts)
assert.Equal(t, resultRequests, ddRequests)
assert.Nil(t, dataDefinitionCodec.Close())
}

View File

@ -90,7 +90,8 @@ func newEventReader(datatype schemapb.DataType, buffer *bytes.Buffer) (*EventRea
return nil, err
}
payloadReader, err := NewPayloadReader(datatype, buffer.Bytes())
payloadBuffer := buffer.Next(int(reader.EventLength - reader.eventHeader.GetMemoryUsageInBytes() - reader.GetEventDataFixPartSize()))
payloadReader, err := NewPayloadReader(datatype, payloadBuffer)
if err != nil {
return nil, err
}

View File

@ -0,0 +1,80 @@
package writenode
import (
"context"
"log"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)
type dataSyncService struct {
ctx context.Context
fg *flowgraph.TimeTickedFlowGraph
}
func newDataSyncService(ctx context.Context) *dataSyncService {
return &dataSyncService{
ctx: ctx,
fg: nil,
}
}
func (dsService *dataSyncService) start() {
dsService.initNodes()
dsService.fg.Start()
}
func (dsService *dataSyncService) close() {
if dsService.fg != nil {
dsService.fg.Close()
}
}
func (dsService *dataSyncService) initNodes() {
// TODO: add delete pipeline support
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
var dmStreamNode Node = newDmInputNode(dsService.ctx)
var filterDmNode Node = newFilteredDmNode()
var writeNode Node = newWriteNode()
var serviceTimeNode Node = newServiceTimeNode()
dsService.fg.AddNode(&dmStreamNode)
dsService.fg.AddNode(&filterDmNode)
dsService.fg.AddNode(&writeNode)
dsService.fg.AddNode(&serviceTimeNode)
var err = dsService.fg.SetEdges(dmStreamNode.Name(),
[]string{},
[]string{filterDmNode.Name()},
)
if err != nil {
log.Fatal("set edges failed in node:", dmStreamNode.Name())
}
err = dsService.fg.SetEdges(filterDmNode.Name(),
[]string{dmStreamNode.Name()},
[]string{writeNode.Name()},
)
if err != nil {
log.Fatal("set edges failed in node:", filterDmNode.Name())
}
err = dsService.fg.SetEdges(writeNode.Name(),
[]string{filterDmNode.Name()},
[]string{serviceTimeNode.Name()},
)
if err != nil {
log.Fatal("set edges failed in node:", writeNode.Name())
}
err = dsService.fg.SetEdges(serviceTimeNode.Name(),
[]string{writeNode.Name()},
[]string{},
)
if err != nil {
log.Fatal("set edges failed in node:", serviceTimeNode.Name())
}
}

View File

@ -0,0 +1,143 @@
package writenode
import (
"context"
"encoding/binary"
"math"
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
// NOTE: start pulsar before test
func TestDataSyncService_Start(t *testing.T) {
Params.Init()
const ctxTimeInMillisecond = 200
const closeWithDeadline = true
var ctx context.Context
if closeWithDeadline {
var cancel context.CancelFunc
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel = context.WithDeadline(context.Background(), d)
defer cancel()
} else {
ctx = context.Background()
}
// init write node
pulsarURL, _ := Params.pulsarAddress()
node := NewWriteNode(ctx, 0)
// test data generate
const DIM = 16
const N = 10
var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
var rawData []byte
for _, ele := range vec {
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, math.Float32bits(ele))
rawData = append(rawData, buf...)
}
bs := make([]byte, 4)
binary.LittleEndian.PutUint32(bs, 1)
rawData = append(rawData, bs...)
var records []*commonpb.Blob
for i := 0; i < N; i++ {
blob := &commonpb.Blob{
Value: rawData,
}
records = append(records, blob)
}
timeRange := TimeRange{
timestampMin: 0,
timestampMax: math.MaxUint64,
}
// messages generate
const MSGLENGTH = 10
insertMessages := make([]msgstream.TsMsg, 0)
for i := 0; i < MSGLENGTH; i++ {
randt := rand.Intn(MSGLENGTH)
// randt := i
var msg msgstream.TsMsg = &msgstream.InsertMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{
uint32(i), uint32(i),
},
},
InsertRequest: internalPb.InsertRequest{
MsgType: internalPb.MsgType_kInsert,
ReqID: int64(0),
CollectionName: "collection0",
PartitionTag: "default",
SegmentID: int64(0),
ChannelID: int64(0),
ProxyID: int64(0),
Timestamps: []uint64{uint64(randt + 1000), uint64(randt + 1000)},
RowIDs: []int64{int64(i), int64(i)},
RowData: []*commonpb.Blob{
{Value: rawData},
{Value: rawData},
},
},
}
insertMessages = append(insertMessages, msg)
}
msgPack := msgstream.MsgPack{
BeginTs: timeRange.timestampMin,
EndTs: timeRange.timestampMax,
Msgs: insertMessages,
}
// generate timeTick
timeTickMsgPack := msgstream.MsgPack{}
timeTickMsg := &msgstream.TimeTickMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{0},
},
TimeTickMsg: internalPb.TimeTickMsg{
MsgType: internalPb.MsgType_kTimeTick,
PeerID: UniqueID(0),
Timestamp: math.MaxUint64,
},
}
timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg)
// pulsar produce
const receiveBufSize = 1024
producerChannels := Params.insertChannelNames()
insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
insertStream.SetPulsarClient(pulsarURL)
insertStream.CreatePulsarProducers(producerChannels)
var insertMsgStream msgstream.MsgStream = insertStream
insertMsgStream.Start()
err := insertMsgStream.Produce(&msgPack)
assert.NoError(t, err)
err = insertMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
// dataSync
node.dataSyncService = newDataSyncService(node.ctx)
go node.dataSyncService.start()
node.Close()
<-ctx.Done()
}

View File

@ -0,0 +1,64 @@
package writenode
import (
"log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
type filterDmNode struct {
BaseNode
}
func (fdmNode *filterDmNode) Name() string {
return "fdmNode"
}
func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
if len(in) != 1 {
log.Println("Invalid operate message input in filterDmNode, input length = ", len(in))
// TODO: add error handling
}
msMsg, ok := (*in[0]).(*MsgStreamMsg)
if !ok {
log.Println("type assertion failed for MsgStreamMsg")
// TODO: add error handling
}
var iMsg = insertMsg{
insertMessages: make([]*msgstream.InsertMsg, 0),
timeRange: TimeRange{
timestampMin: msMsg.TimestampMin(),
timestampMax: msMsg.TimestampMax(),
},
}
for _, msg := range msMsg.TsMessages() {
switch msg.Type() {
case internalPb.MsgType_kInsert:
iMsg.insertMessages = append(iMsg.insertMessages, msg.(*msgstream.InsertMsg))
// case internalPb.MsgType_kDelete:
// dmMsg.deleteMessages = append(dmMsg.deleteMessages, (*msg).(*msgstream.DeleteTask))
default:
log.Println("Non supporting message type:", msg.Type())
}
}
var res Msg = &iMsg
return []*Msg{&res}
}
func newFilteredDmNode() *filterDmNode {
maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := Params.flowGraphMaxParallelism()
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &filterDmNode{
BaseNode: baseNode,
}
}

View File

@ -0,0 +1,61 @@
package writenode
import (
"log"
)
type (
writeNode struct {
BaseNode
}
)
func (wNode *writeNode) Name() string {
return "wNode"
}
func (wNode *writeNode) Operate(in []*Msg) []*Msg {
log.Println("=========== WriteNode Operating")
if len(in) != 1 {
log.Println("Invalid operate message input in writetNode, input length = ", len(in))
// TODO: add error handling
}
iMsg, ok := (*in[0]).(*insertMsg)
if !ok {
log.Println("type assertion failed for insertMsg")
// TODO: add error handling
}
log.Println("=========== insertMsg length:", len(iMsg.insertMessages))
for _, task := range iMsg.insertMessages {
if len(task.RowIDs) != len(task.Timestamps) || len(task.RowIDs) != len(task.RowData) {
log.Println("Error, misaligned messages detected")
continue
}
log.Println("Timestamp: ", task.Timestamps[0])
log.Printf("t(%d) : %v ", task.Timestamps[0], task.RowData[0])
}
var res Msg = &serviceTimeMsg{
timeRange: iMsg.timeRange,
}
// TODO
return []*Msg{&res}
}
func newWriteNode() *writeNode {
maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := Params.flowGraphMaxParallelism()
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &writeNode{
BaseNode: baseNode,
}
}

View File

@ -0,0 +1,102 @@
package writenode
import (
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)
type (
Msg = flowgraph.Msg
MsgStreamMsg = flowgraph.MsgStreamMsg
SegmentID = UniqueID
)
type (
key2SegMsg struct {
tsMessages []msgstream.TsMsg
timeRange TimeRange
}
schemaUpdateMsg struct {
timeRange TimeRange
}
insertMsg struct {
insertMessages []*msgstream.InsertMsg
timeRange TimeRange
}
deleteMsg struct {
deleteMessages []*msgstream.DeleteMsg
timeRange TimeRange
}
serviceTimeMsg struct {
timeRange TimeRange
}
InsertData struct {
insertIDs map[SegmentID][]UniqueID
insertTimestamps map[SegmentID][]Timestamp
insertRecords map[SegmentID][]*commonpb.Blob
insertOffset map[SegmentID]int64
}
DeleteData struct {
deleteIDs map[SegmentID][]UniqueID
deleteTimestamps map[SegmentID][]Timestamp
deleteOffset map[SegmentID]int64
}
DeleteRecord struct {
entityID UniqueID
timestamp Timestamp
segmentID UniqueID
}
DeletePreprocessData struct {
deleteRecords []*DeleteRecord
count int32
}
)
func (ksMsg *key2SegMsg) TimeTick() Timestamp {
return ksMsg.timeRange.timestampMax
}
func (ksMsg *key2SegMsg) DownStreamNodeIdx() int {
return 0
}
func (suMsg *schemaUpdateMsg) TimeTick() Timestamp {
return suMsg.timeRange.timestampMax
}
func (suMsg *schemaUpdateMsg) DownStreamNodeIdx() int {
return 0
}
func (iMsg *insertMsg) TimeTick() Timestamp {
return iMsg.timeRange.timestampMax
}
func (iMsg *insertMsg) DownStreamNodeIdx() int {
return 0
}
func (dMsg *deleteMsg) TimeTick() Timestamp {
return dMsg.timeRange.timestampMax
}
func (dMsg *deleteMsg) DownStreamNodeIdx() int {
return 0
}
func (stMsg *serviceTimeMsg) TimeTick() Timestamp {
return stMsg.timeRange.timestampMax
}
func (stMsg *serviceTimeMsg) DownStreamNodeIdx() int {
return 0
}

View File

@ -0,0 +1,39 @@
package writenode
import (
"context"
"log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)
func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
receiveBufSize := Params.insertReceiveBufSize()
pulsarBufSize := Params.insertPulsarBufSize()
msgStreamURL, err := Params.pulsarAddress()
if err != nil {
log.Fatal(err)
}
consumeChannels := Params.insertChannelNames()
consumeSubName := Params.msgChannelSubName()
insertStream := msgstream.NewPulsarTtMsgStream(ctx, receiveBufSize)
// TODO could panic of nil pointer
insertStream.SetPulsarClient(msgStreamURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
// TODO could panic of nil pointer
insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
var stream msgstream.MsgStream = insertStream
maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := Params.flowGraphMaxParallelism()
node := flowgraph.NewInputNode(&stream, "dmInputNode", maxQueueLength, maxParallelism)
return node
}

View File

@ -0,0 +1,9 @@
package writenode
import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
type (
Node = flowgraph.Node
BaseNode = flowgraph.BaseNode
InputNode = flowgraph.InputNode
)

View File

@ -0,0 +1,46 @@
package writenode
import (
"log"
)
type serviceTimeNode struct {
BaseNode
}
func (stNode *serviceTimeNode) Name() string {
return "stNode"
}
func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg {
if len(in) != 1 {
log.Println("Invalid operate message input in serviceTimeNode, input length = ", len(in))
// TODO: add error handling
}
// serviceTimeMsg, ok := (*in[0]).(*serviceTimeMsg)
_, ok := (*in[0]).(*serviceTimeMsg)
if !ok {
log.Println("type assertion failed for serviceTimeMsg")
// TODO: add error handling
}
// update service time
// (*(*stNode.replica).getTSafe()).set(serviceTimeMsg.timeRange.timestampMax)
// fmt.Println("update tSafe to:", getPhysicalTime(serviceTimeMsg.timeRange.timestampMax))
return nil
}
func newServiceTimeNode() *serviceTimeNode {
maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := Params.flowGraphMaxParallelism()
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &serviceTimeNode{
BaseNode: baseNode,
}
}

View File

@ -0,0 +1,236 @@
package writenode
import (
"log"
"os"
"strconv"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
)
type ParamTable struct {
paramtable.BaseTable
}
var Params ParamTable
func (p *ParamTable) Init() {
p.BaseTable.Init()
err := p.LoadYaml("advanced/write_node.yaml")
if err != nil {
panic(err)
}
writeNodeIDStr := os.Getenv("WRITE_NODE_ID")
if writeNodeIDStr == "" {
writeNodeIDList := p.WriteNodeIDList()
if len(writeNodeIDList) <= 0 {
writeNodeIDStr = "0"
} else {
writeNodeIDStr = strconv.Itoa(int(writeNodeIDList[0]))
}
}
p.Save("_writeNodeID", writeNodeIDStr)
}
func (p *ParamTable) pulsarAddress() (string, error) {
url, err := p.Load("_PulsarAddress")
if err != nil {
panic(err)
}
return url, nil
}
func (p *ParamTable) WriteNodeID() UniqueID {
writeNodeID, err := p.Load("_writeNodeID")
if err != nil {
panic(err)
}
id, err := strconv.Atoi(writeNodeID)
if err != nil {
panic(err)
}
return UniqueID(id)
}
func (p *ParamTable) insertChannelRange() []int {
insertChannelRange, err := p.Load("msgChannel.channelRange.insert")
if err != nil {
panic(err)
}
return paramtable.ConvertRangeToIntRange(insertChannelRange, ",")
}
// advanced params
// stats
func (p *ParamTable) statsPublishInterval() int {
return p.ParseInt("writeNode.stats.publishInterval")
}
// dataSync:
func (p *ParamTable) flowGraphMaxQueueLength() int32 {
return p.ParseInt32("writeNode.dataSync.flowGraph.maxQueueLength")
}
func (p *ParamTable) flowGraphMaxParallelism() int32 {
return p.ParseInt32("writeNode.dataSync.flowGraph.maxParallelism")
}
// msgStream
func (p *ParamTable) insertReceiveBufSize() int64 {
return p.ParseInt64("writeNode.msgStream.insert.recvBufSize")
}
func (p *ParamTable) insertPulsarBufSize() int64 {
return p.ParseInt64("writeNode.msgStream.insert.pulsarBufSize")
}
func (p *ParamTable) searchReceiveBufSize() int64 {
return p.ParseInt64("writeNode.msgStream.search.recvBufSize")
}
func (p *ParamTable) searchPulsarBufSize() int64 {
return p.ParseInt64("writeNode.msgStream.search.pulsarBufSize")
}
func (p *ParamTable) searchResultReceiveBufSize() int64 {
return p.ParseInt64("writeNode.msgStream.searchResult.recvBufSize")
}
func (p *ParamTable) statsReceiveBufSize() int64 {
return p.ParseInt64("writeNode.msgStream.stats.recvBufSize")
}
func (p *ParamTable) etcdAddress() string {
etcdAddress, err := p.Load("_EtcdAddress")
if err != nil {
panic(err)
}
return etcdAddress
}
func (p *ParamTable) metaRootPath() string {
rootPath, err := p.Load("etcd.rootPath")
if err != nil {
panic(err)
}
subPath, err := p.Load("etcd.metaSubPath")
if err != nil {
panic(err)
}
return rootPath + "/" + subPath
}
func (p *ParamTable) gracefulTime() int64 {
gracefulTime, err := p.Load("writeNode.gracefulTime")
if err != nil {
panic(err)
}
time, err := strconv.Atoi(gracefulTime)
if err != nil {
panic(err)
}
return int64(time)
}
func (p *ParamTable) insertChannelNames() []string {
prefix, err := p.Load("msgChannel.chanNamePrefix.insert")
if err != nil {
log.Fatal(err)
}
channelRange, err := p.Load("msgChannel.channelRange.insert")
if err != nil {
panic(err)
}
channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",")
var ret []string
for _, ID := range channelIDs {
ret = append(ret, prefix+strconv.Itoa(ID))
}
sep := len(channelIDs) / p.writeNodeNum()
index := p.sliceIndex()
if index == -1 {
panic("writeNodeID not Match with Config")
}
start := index * sep
return ret[start : start+sep]
}
func (p *ParamTable) searchChannelNames() []string {
prefix, err := p.Load("msgChannel.chanNamePrefix.search")
if err != nil {
log.Fatal(err)
}
channelRange, err := p.Load("msgChannel.channelRange.search")
if err != nil {
panic(err)
}
channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",")
var ret []string
for _, ID := range channelIDs {
ret = append(ret, prefix+strconv.Itoa(ID))
}
return ret
}
func (p *ParamTable) searchResultChannelNames() []string {
prefix, err := p.Load("msgChannel.chanNamePrefix.searchResult")
if err != nil {
log.Fatal(err)
}
prefix += "-"
channelRange, err := p.Load("msgChannel.channelRange.searchResult")
if err != nil {
panic(err)
}
channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",")
var ret []string
for _, ID := range channelIDs {
ret = append(ret, prefix+strconv.Itoa(ID))
}
return ret
}
func (p *ParamTable) msgChannelSubName() string {
// TODO: subName = namePrefix + "-" + writeNodeID, writeNodeID is assigned by master
name, err := p.Load("msgChannel.subNamePrefix.writeNodeSubNamePrefix")
if err != nil {
log.Panic(err)
}
writeNodeIDStr, err := p.Load("_WriteNodeID")
if err != nil {
panic(err)
}
return name + "-" + writeNodeIDStr
}
func (p *ParamTable) writeNodeTimeTickChannelName() string {
channels, err := p.Load("msgChannel.chanNamePrefix.writeNodeTimeTick")
if err != nil {
panic(err)
}
return channels
}
func (p *ParamTable) sliceIndex() int {
writeNodeID := p.WriteNodeID()
writeNodeIDList := p.WriteNodeIDList()
for i := 0; i < len(writeNodeIDList); i++ {
if writeNodeID == writeNodeIDList[i] {
return i
}
}
return -1
}
func (p *ParamTable) writeNodeNum() int {
return len(p.WriteNodeIDList())
}

View File

@ -0,0 +1,112 @@
package writenode
import (
"strings"
"testing"
"github.com/stretchr/testify/assert"
)
func TestParamTable_WriteNode(t *testing.T) {
Params.Init()
t.Run("Test PulsarAddress", func(t *testing.T) {
address, err := Params.pulsarAddress()
assert.NoError(t, err)
split := strings.Split(address, ":")
assert.Equal(t, split[0], "pulsar")
assert.Equal(t, split[len(split)-1], "6650")
})
t.Run("Test WriteNodeID", func(t *testing.T) {
id := Params.WriteNodeID()
assert.Equal(t, id, UniqueID(3))
})
t.Run("Test insertChannelRange", func(t *testing.T) {
channelRange := Params.insertChannelRange()
assert.Equal(t, len(channelRange), 2)
assert.Equal(t, channelRange[0], 0)
assert.Equal(t, channelRange[1], 2)
})
t.Run("Test statsServiceTimeInterval", func(t *testing.T) {
interval := Params.statsPublishInterval()
assert.Equal(t, interval, 1000)
})
t.Run("Test statsMsgStreamReceiveBufSize", func(t *testing.T) {
bufSize := Params.statsReceiveBufSize()
assert.Equal(t, bufSize, int64(64))
})
t.Run("Test insertMsgStreamReceiveBufSize", func(t *testing.T) {
bufSize := Params.insertReceiveBufSize()
assert.Equal(t, bufSize, int64(1024))
})
t.Run("Test searchMsgStreamReceiveBufSize", func(t *testing.T) {
bufSize := Params.searchReceiveBufSize()
assert.Equal(t, bufSize, int64(512))
})
t.Run("Test searchResultMsgStreamReceiveBufSize", func(t *testing.T) {
bufSize := Params.searchResultReceiveBufSize()
assert.Equal(t, bufSize, int64(64))
})
t.Run("Test searchPulsarBufSize", func(t *testing.T) {
bufSize := Params.searchPulsarBufSize()
assert.Equal(t, bufSize, int64(512))
})
t.Run("Test insertPulsarBufSize", func(t *testing.T) {
bufSize := Params.insertPulsarBufSize()
assert.Equal(t, bufSize, int64(1024))
})
t.Run("Test flowGraphMaxQueueLength", func(t *testing.T) {
length := Params.flowGraphMaxQueueLength()
assert.Equal(t, length, int32(1024))
})
t.Run("Test flowGraphMaxParallelism", func(t *testing.T) {
maxParallelism := Params.flowGraphMaxParallelism()
assert.Equal(t, maxParallelism, int32(1024))
})
t.Run("Test insertChannelNames", func(t *testing.T) {
names := Params.insertChannelNames()
assert.Equal(t, len(names), 2)
assert.Equal(t, names[0], "insert0")
assert.Equal(t, names[1], "insert1")
})
t.Run("Test searchChannelNames", func(t *testing.T) {
names := Params.searchChannelNames()
assert.Equal(t, len(names), 1)
assert.Equal(t, names[0], "search0")
})
t.Run("Test searchResultChannelName", func(t *testing.T) {
names := Params.searchResultChannelNames()
assert.Equal(t, len(names), 1)
assert.Equal(t, names[0], "searchResult-0")
})
t.Run("Test msgChannelSubName", func(t *testing.T) {
name := Params.msgChannelSubName()
assert.Equal(t, name, "writeNode-3")
})
t.Run("Test timeTickChannelName", func(t *testing.T) {
name := Params.writeNodeTimeTickChannelName()
assert.Equal(t, name, "writeNodeTimeTick")
})
t.Run("Test metaRootPath", func(t *testing.T) {
path := Params.metaRootPath()
assert.Equal(t, path, "by-dev/meta")
})
}

View File

@ -0,0 +1,15 @@
package writenode
import "github.com/zilliztech/milvus-distributed/internal/util/typeutil"
type (
UniqueID = typeutil.UniqueID
Timestamp = typeutil.Timestamp
IntPrimaryKey = typeutil.IntPrimaryKey
DSL = string
TimeRange struct {
timestampMin Timestamp
timestampMax Timestamp
}
)

View File

@ -0,0 +1,53 @@
package writenode
import (
"context"
)
type WriteNode struct {
ctx context.Context
WriteNodeID uint64
dataSyncService *dataSyncService
}
func NewWriteNode(ctx context.Context, writeNodeID uint64) *WriteNode {
return &WriteNode{
ctx: ctx,
WriteNodeID: writeNodeID,
dataSyncService: nil,
}
}
func (node *WriteNode) Start() {
node.dataSyncService = newDataSyncService(node.ctx)
// node.searchService = newSearchService(node.ctx)
// node.metaService = newMetaService(node.ctx)
// node.statsService = newStatsService(node.ctx)
go node.dataSyncService.start()
// go node.searchService.start()
// go node.metaService.start()
// node.statsService.start()
}
func (node *WriteNode) Close() {
<-node.ctx.Done()
// free collectionReplica
// (*node.replica).freeAll()
// close services
if node.dataSyncService != nil {
(*node.dataSyncService).close()
}
// if node.searchService != nil {
// (*node.searchService).close()
// }
// if node.statsService != nil {
// (*node.statsService).close()
// }
}

View File

@ -13,5 +13,5 @@ SCRIPTS_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
# ignore Minio,S3 unittes
MILVUS_DIR="${SCRIPTS_DIR}/../internal/"
echo $MILVUS_DIR
go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/proxy/..." -failfast
go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/proxy/..." "${MILVUS_DIR}/writenode/..." -failfast
#go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." -failfast