Refactor manipulationreq and add tso(pre-alloc from master)

Signed-off-by: shengjh <1572099106@qq.com>
pull/4973/head^2
shengjh 2020-10-27 12:01:27 +08:00 committed by yefu.chen
parent 848e375c0d
commit 77b2fcf015
25 changed files with 700 additions and 288 deletions

3
.env
View File

@ -1,3 +0,0 @@
REPO=milvusdb/milvus-distributed-dev
ARCH=amd64
UBUNTU=18.04

View File

@ -26,46 +26,41 @@ on:
jobs:
ubuntu:
name: AMD64 Ubuntu ${{ matrix.ubuntu }}
runs-on: ubuntu-latest
timeout-minutes: 60
strategy:
fail-fast: false
matrix:
ubuntu: [18.04]
env:
UBUNTU: ${{ matrix.ubuntu }}
name: AMD64 ubuntu-18.04
runs-on: ubuntu-18.04
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Check Dockerfile
uses: reviewdog/action-hadolint@v1
- name: Install Dependency
run: |
./scripts/install_deps.sh
go get github.com/golang/protobuf/protoc-gen-go@v1.3.2
- name: Cache Core Thirdparty
id: cache-core
uses: actions/cache@v2
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
reporter: github-pr-check # Default is github-pr-check
- name: Docker Pull
shell: bash
path: |
./internal/core/cmake_build
key: ${{ runner.os }}-core-thirdparty
- name: Build Cpp
run: |
docker-compose pull --ignore-pull-failures ubuntu
- name: Docker Build
shell: bash
./scripts/core_build.sh -u
- name: Generat Proto GO File
run: |
docker-compose build ubuntu
docker rmi $(docker images | grep '<none>' | awk '{print $3}') || exit 0
- name: Cache Docker Volumes
uses: actions/cache@v1
with:
path: .docker
key: ubuntu${{ matrix.ubuntu }}-${{ hashFiles('internal/core/**') }}
restore-keys: ubuntu${{ matrix.ubuntu }}-
- name: Docker Run
echo `pwd`
pwd_dir=`pwd`
export PATH=$PATH:$(go env GOPATH)/bin
export protoc=${pwd_dir}/internal/core/cmake_build/thirdparty/protobuf/protobuf-build/protoc
./scripts/proto_gen_go.sh
- name: Build GO
run: |
docker-compose run ubuntu
- name: Docker Push
if: success() && github.event_name == 'push' && github.repository == 'zilliztech/milvus-distributed'
continue-on-error: true
shell: bash
go build -o ./cmd/writer/writer ./cmd/writer/writer.go
go build -o ./cmd/reader/reader ./cmd/reader/reader.go
go build -o ./cmd/master/master ./cmd/master/master.go
go build -o ./cmd/proxy/proxy ./cmd/proxy/proxy.go
- name: Docker Pull And Run
run: |
docker login -u ${{ secrets.DOCKERHUB_USER }} \
-p ${{ secrets.DOCKERHUB_TOKEN }}
docker-compose push ubuntu
docker-compose up -d
- name: Run Unittest
run: |
./scripts/run_unittest.sh

View File

@ -42,7 +42,7 @@ RUN mkdir -p /usr/local/go && wget -qO- "https://golang.org/dl/go1.15.2.linux-am
go get github.com/golang/protobuf/protoc-gen-go@v1.3.2
# Set permissions on /etc/passwd and /home to allow arbitrary users to write
COPY --chown=0:0 build/docker/env/entrypoint.sh /
COPY --chown=0:0 docker/build_env/entrypoint.sh /
RUN mkdir -p /home/user && chgrp -R 0 /home && chmod -R g=u /etc/passwd /etc/group /home && chmod +x /entrypoint.sh
ENV HOME=/home/user

View File

@ -1,25 +1,60 @@
version: '3.5'
x-ccache: &ccache
CCACHE_COMPILERCHECK: content
CCACHE_COMPRESS: 1
CCACHE_COMPRESSLEVEL: 5
CCACHE_MAXSIZE: 2G
CCACHE_DIR: /ccache
services:
ubuntu:
image: ${REPO}:${ARCH}-ubuntu${UBUNTU}
build:
context: .
dockerfile: build/docker/env/cpu/ubuntu${UBUNTU}/Dockerfile
cache_from:
- ${REPO}:${ARCH}-ubuntu${UBUNTU}
shm_size: 2G
environment:
<<: *ccache
volumes: &ubuntu-volumes
- .:/milvus-distributed:delegated
- ${DOCKER_VOLUME_DIRECTORY:-.docker}/${ARCH}-ubuntu${UBUNTU}-cache:/ccache:delegated
command: &ubuntu-command
["/bin/bash", "-l", "-c", "/milvus-distributed/scripts/core_build.sh -u"]
etcd:
image: quay.io/coreos/etcd:latest
command: etcd -listen-peer-urls=http://127.0.0.1:12380 -advertise-client-urls=http://127.0.0.1:12379 -listen-client-urls http://0.0.0.0:12379,http://0.0.0.0:14001 -initial-advertise-peer-urls=http://127.0.0.1:12380 --initial-cluster default=http://127.0.0.1:12380
ports:
- "12379:12379"
- "12380:12380"
- "14001:14001"
pulsar:
image: apachepulsar/pulsar:latest
command: bin/pulsar standalone
ports:
- "6650:6650"
- "18080:8080"
pd0:
image: pingcap/pd:latest
network_mode: "host"
ports:
- "2379:2379"
- "2380:2380"
volumes:
- /tmp/config/pd.toml:/pd.toml:ro
- /tmp/data:/data
- /tmp/logs:/logs
- /etc/localtime:/etc/localtime:ro
command:
- --name=pd0
- --client-urls=http://0.0.0.0:2379
- --peer-urls=http://0.0.0.0:2380
- --advertise-client-urls=http://127.0.0.1:2379
- --advertise-peer-urls=http://127.0.0.1:2380
- --initial-cluster=pd0=http://127.0.0.1:2380
- --data-dir=/data/pd0
- --log-file=/logs/pd0.log
restart: on-failure
tikv0:
network_mode: "host"
image: pingcap/tikv:latest
ports:
- "20160:20160"
volumes:
- /tmp/config/tikv.toml:/tikv.toml:ro
- /tmp/data:/data
- /tmp/logs:/logs
- /etc/localtime:/etc/localtime:ro
command:
- --addr=0.0.0.0:20160
- --advertise-addr=127.0.0.1:20160
- --data-dir=/data/tikv0
- --pd=127.0.0.1:2379
- --log-file=/logs/tikv0.log
depends_on:
- "pd0"
restart: on-failure

View File

@ -442,17 +442,17 @@ type TsMsg interface {
Ts() Timestamp
}
type TsMsgMarshaler interface {
Marshal(input *TsMsg) ([]byte, Status)
Unmarshal(input []byte) (*TsMsg, Status)
}
type MsgPack struct {
BeginTs Timestamp
EndTs Timestamp
Msgs []*TsMsg
}
type TsMsgMarshaler interface {
Marshal(input *TsMsg) ([]byte, Status)
Unmarshal(input []byte) (*TsMsg, Status)
}
type MsgStream interface {
SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler)
Produce(*MsgPack) Status
@ -461,17 +461,14 @@ type MsgStream interface {
type PulsarMsgStream struct {
client *pulsar.Client
msgHashFunc (*MsgPack) map[int32]*MsgPack // return a map from produceChannel idx to *MsgPack
producers []*pulsar.Producer
consumers []*pulsar.Consumer
produceChannels []string
consumeChannels []string
msgMarshaler *TsMsgMarshaler
msgUnmarshaler *TsMsgMarshaler
}
func (ms *PulsarMsgStream) SetProducerChannels(channels []string)
func (ms *PulsarMsgStream) SetConsumerChannels(channels []string)
func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler)
func (ms *PulsarMsgStream) SetMsgHashFunc(XXX)
func (ms *PulsarMsgStream) Produce(*MsgPack) Status
func (ms *PulsarMsgStream) Consume() *MsgPack //return messages in one time tick

View File

@ -5,19 +5,10 @@ if [[ ! ${jobs+1} ]]; then
jobs=$(nproc)
fi
SOURCE="${BASH_SOURCE[0]}"
while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symlink
DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
SOURCE="$(readlink "$SOURCE")"
[[ $SOURCE != /* ]] && SOURCE="$DIR/$SOURCE" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
done
SCRIPTS_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
BUILD_OUTPUT_DIR="cmake_build"
BUILD_TYPE="Release"
BUILD_UNITTEST="OFF"
INSTALL_PREFIX=${SCRIPTS_DIR}/output
INSTALL_PREFIX=$(pwd)/output
MAKE_CLEAN="OFF"
BUILD_COVERAGE="OFF"
DB_PATH="/tmp/milvus"

View File

@ -0,0 +1,58 @@
find_package(Threads REQUIRED)
include(ExternalProject)
ExternalProject_Add(
googletest
URL http://ss2.fluorinedog.com/data/gtest_v1.10.x.zip
UPDATE_COMMAND ""
INSTALL_COMMAND ""
LOG_DOWNLOAD ON
LOG_CONFIGURE ON
LOG_BUILD ON)
ExternalProject_Get_Property(googletest source_dir)
set(GTEST_INCLUDE_DIRS ${source_dir}/googletest/include)
set(GMOCK_INCLUDE_DIRS ${source_dir}/googlemock/include)
# The cloning of the above repo doesn't happen until make, however if the dir doesn't
# exist, INTERFACE_INCLUDE_DIRECTORIES will throw an error.
# To make it work, we just create the directory now during config.
file(MAKE_DIRECTORY ${GTEST_INCLUDE_DIRS})
file(MAKE_DIRECTORY ${GMOCK_INCLUDE_DIRS})
ExternalProject_Get_Property(googletest binary_dir)
set(GTEST_LIBRARY_PATH ${binary_dir}/lib/${CMAKE_FIND_LIBRARY_PREFIXES}gtest.a)
set(GTEST_LIBRARY gtest)
add_library(${GTEST_LIBRARY} UNKNOWN IMPORTED)
set_target_properties(${GTEST_LIBRARY} PROPERTIES
"IMPORTED_LOCATION" "${GTEST_LIBRARY_PATH}"
"IMPORTED_LINK_INTERFACE_LIBRARIES" "${CMAKE_THREAD_LIBS_INIT}"
"INTERFACE_INCLUDE_DIRECTORIES" "${GTEST_INCLUDE_DIRS}")
add_dependencies(${GTEST_LIBRARY} googletest)
set(GTEST_MAIN_LIBRARY_PATH ${binary_dir}/lib/${CMAKE_FIND_LIBRARY_PREFIXES}gtest_main.a)
set(GTEST_MAIN_LIBRARY gtest_main)
add_library(${GTEST_MAIN_LIBRARY} UNKNOWN IMPORTED)
set_target_properties(${GTEST_MAIN_LIBRARY} PROPERTIES
"IMPORTED_LOCATION" "${GTEST_MAIN_LIBRARY_PATH}"
"IMPORTED_LINK_INTERFACE_LIBRARImS" "${CMAKE_THREAD_LIBS_INIT}"
"INTERFACE_INCLUDE_DIRECTORIES" "${GTEST_INCLUDE_DIRS}")
add_dependencies(${GTEST_MAIN_LIBRARY} googletest)
# set(GMOCK_LIBRARY_PATH ${binary_dir}/googlemock/${CMAKE_FIND_LIBRARY_PREFIXES}gmock.a)
# set(GMOCK_LIBRARY gmock)
# add_library(${GMOCK_LIBRARY} UNKNOWN IMPORTED)
# set_target_properties(${GMOCK_LIBRARY} PROPERTIES
# "IMPORTED_LOCATION" "${GMOCK_LIBRARY_PATH}"
# "IMPORTED_LINK_INTERFACE_LIBRARIES" "${CMAKE_THREAD_LIBS_INIT}"
# "INTERFACE_INCLUDE_DIRECTORIES" "${GMOCK_INCLUDE_DIRS}")
# add_dependencies(${GMOCK_LIBRARY} googletest)
# set(GMOCK_MAIN_LIBRARY_PATH ${binary_dir}/googlemock/${CMAKE_FIND_LIBRARY_PREFIXES}gmock_main.a)
# set(GMOCK_MAIN_LIBRARY gmock_main)
# add_library(${GMOCK_MAIN_LIBRARY} UNKNOWN IMPORTED)
# set_target_properties(${GMOCK_MAIN_LIBRARY} PROPERTIES
# "IMPORTED_LOCATION" "${GMOCK_MAIN_LIBRARY_PATH}"
# "IMPORTED_LINK_INTERFACE_LIBRARIES" "${CMAKE_THREAD_LIBS_INIT}"
# "INTERFACE_INCLUDE_DIRECTORIES" "${GMOCK_INCLUDE_DIRS}")
# add_dependencies(${GMOCK_MAIN_LIBRARY} ${GTEST_LIBRARY})

View File

@ -29,3 +29,47 @@ add_subdirectory( log)
add_subdirectory( dog_segment)
add_subdirectory( cache )
add_subdirectory( query )
# add_subdirectory( db ) # target milvus_engine
# add_subdirectory( server )
# set(link_lib
# milvus_engine
# # dog_segment
# #query
# utils
# curl
# )
# set( BOOST_LIB libboost_system.a
# libboost_filesystem.a
# libboost_serialization.a
# )
# set( THIRD_PARTY_LIBS yaml-cpp
# )
# target_link_libraries( server
# PUBLIC ${link_lib}
# ${THIRD_PARTY_LIBS}
# ${BOOST_LIB}
# )
# # **************************** Get&Print Include Directories ****************************
# get_property( dirs DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES )
# foreach ( dir ${dirs} )
# message( STATUS "Current Include DIRS: ")
# endforeach ()
# set( SERVER_LIBS server )
# add_executable( milvus_server ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
# )
# #target_include_directories(db PUBLIC ${PROJECT_BINARY_DIR}/thirdparty/pulsar-client-cpp/pulsar-client-cpp-src/pulsar-client-cpp/include)
# target_link_libraries( milvus_server PRIVATE ${SERVER_LIBS} )
# install( TARGETS milvus_server DESTINATION bin )

View File

@ -14,6 +14,7 @@
set(KNOWHERE_THIRDPARTY_DEPENDENCIES
Arrow
FAISS
GTest
OpenBLAS
MKL
)
@ -30,6 +31,8 @@ endforeach ()
macro(build_dependency DEPENDENCY_NAME)
if ("${DEPENDENCY_NAME}" STREQUAL "Arrow")
build_arrow()
elseif ("${DEPENDENCY_NAME}" STREQUAL "GTest")
find_package(GTest REQUIRED)
elseif ("${DEPENDENCY_NAME}" STREQUAL "OpenBLAS")
build_openblas()
elseif ("${DEPENDENCY_NAME}" STREQUAL "FAISS")
@ -212,6 +215,12 @@ else ()
)
endif ()
# if (DEFINED ENV{KNOWHERE_GTEST_URL})
# set(GTEST_SOURCE_URL "$ENV{KNOWHERE_GTEST_URL}")
# else ()
# set(GTEST_SOURCE_URL
# "https://github.com/google/googletest/archive/release-${GTEST_VERSION}.tar.gz")
# endif ()
if (DEFINED ENV{KNOWHERE_OPENBLAS_URL})
set(OPENBLAS_SOURCE_URL "$ENV{KNOWHERE_OPENBLAS_URL}")
@ -374,6 +383,91 @@ if (KNOWHERE_WITH_OPENBLAS)
link_directories(SYSTEM "${OpenBLAS_LIB_DIR}")
endif()
# ----------------------------------------------------------------------
# Google gtest
# macro(build_gtest)
# message(STATUS "Building gtest-${GTEST_VERSION} from source")
# set(GTEST_VENDORED TRUE)
# set(GTEST_CMAKE_CXX_FLAGS "${EP_CXX_FLAGS}")
#
# if (APPLE)
# set(GTEST_CMAKE_CXX_FLAGS
# ${GTEST_CMAKE_CXX_FLAGS}
# -DGTEST_USE_OWN_TR1_TUPLE=1
# -Wno-unused-value
# -Wno-ignored-attributes)
# endif ()
#
# set(GTEST_PREFIX "${INDEX_BINARY_DIR}/googletest_ep-prefix/src/googletest_ep")
# set(GTEST_INCLUDE_DIR "${GTEST_PREFIX}/include")
# set(GTEST_STATIC_LIB
# "${GTEST_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}gtest${CMAKE_STATIC_LIBRARY_SUFFIX}")
# set(GTEST_MAIN_STATIC_LIB
# "${GTEST_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}gtest_main${CMAKE_STATIC_LIBRARY_SUFFIX}")
#
# set(GTEST_CMAKE_ARGS
# ${EP_COMMON_CMAKE_ARGS}
# "-DCMAKE_INSTALL_PREFIX=${GTEST_PREFIX}"
# "-DCMAKE_INSTALL_LIBDIR=lib"
# -DCMAKE_CXX_FLAGS=${GTEST_CMAKE_CXX_FLAGS}
# -DCMAKE_BUILD_TYPE=Release)
#
# set(GMOCK_INCLUDE_DIR "${GTEST_PREFIX}/include")
# set(GMOCK_STATIC_LIB
# "${GTEST_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}gmock${CMAKE_STATIC_LIBRARY_SUFFIX}"
# )
#
# ExternalProject_Add(googletest_ep
# URL
# ${GTEST_SOURCE_URL}
# BUILD_COMMAND
# ${MAKE}
# ${MAKE_BUILD_ARGS}
# BUILD_BYPRODUCTS
# ${GTEST_STATIC_LIB}
# ${GTEST_MAIN_STATIC_LIB}
# ${GMOCK_STATIC_LIB}
# CMAKE_ARGS
# ${GTEST_CMAKE_ARGS}
# ${EP_LOG_OPTIONS})
#
# # The include directory must exist before it is referenced by a target.
# file(MAKE_DIRECTORY "${GTEST_INCLUDE_DIR}")
#
# add_library(gtest STATIC IMPORTED)
# set_target_properties(gtest
# PROPERTIES IMPORTED_LOCATION "${GTEST_STATIC_LIB}"
# INTERFACE_INCLUDE_DIRECTORIES "${GTEST_INCLUDE_DIR}")
#
# add_library(gtest_main STATIC IMPORTED)
# set_target_properties(gtest_main
# PROPERTIES IMPORTED_LOCATION "${GTEST_MAIN_STATIC_LIB}"
# INTERFACE_INCLUDE_DIRECTORIES "${GTEST_INCLUDE_DIR}")
#
# add_library(gmock STATIC IMPORTED)
# set_target_properties(gmock
# PROPERTIES IMPORTED_LOCATION "${GMOCK_STATIC_LIB}"
# INTERFACE_INCLUDE_DIRECTORIES "${GTEST_INCLUDE_DIR}")
#
# add_dependencies(gtest googletest_ep)
# add_dependencies(gtest_main googletest_ep)
# add_dependencies(gmock googletest_ep)
#
# endmacro()
## if (KNOWHERE_BUILD_TESTS AND NOT TARGET googletest_ep)
#if ( NOT TARGET gtest AND KNOWHERE_BUILD_TESTS )
# resolve_dependency(GTest)
#
# if (NOT GTEST_VENDORED)
# endif ()
#
# # TODO: Don't use global includes but rather target_include_directories
# get_target_property(GTEST_INCLUDE_DIR gtest INTERFACE_INCLUDE_DIRECTORIES)
# link_directories(SYSTEM "${GTEST_PREFIX}/lib")
# include_directories(SYSTEM ${GTEST_INCLUDE_DIR})
#endif ()
# ----------------------------------------------------------------------
# MKL

View File

@ -0,0 +1,50 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include <getopt.h>
#include <unistd.h>
#include <csignal>
#include <cstring>
#include <string>
#include "config/ConfigMgr.h"
#include "easyloggingpp/easylogging++.h"
#include "utils/SignalHandler.h"
#include "utils/Status.h"
INITIALIZE_EASYLOGGINGPP
void
print_help(const std::string& app_name) {
std::cout << std::endl << "Usage: " << app_name << " [OPTIONS]" << std::endl;
std::cout << R"(
Options:
-h --help Print this help.
-c --conf_file filename Read configuration from the file.
-d --daemon Daemonize this application.
-p --pid_file filename PID file used by daemonized app.
)" << std::endl;
}
void
print_banner() {
std::cout << std::endl;
std::cout << " __ _________ _ ____ ______ " << std::endl;
std::cout << " / |/ / _/ /| | / / / / / __/ " << std::endl;
std::cout << " / /|_/ // // /_| |/ / /_/ /\\ \\ " << std::endl;
std::cout << " /_/ /_/___/____/___/\\____/___/ " << std::endl;
std::cout << std::endl;
}
int
main(int argc, char* argv[]) {
print_banner();
}

View File

@ -32,12 +32,6 @@ set( FETCHCONTENT_QUIET OFF )
set( THREADS_PREFER_PTHREAD_FLAG ON )
find_package( Threads REQUIRED )
# ****************************** Thirdparty googletest ***************************************
if ( MILVUS_BUILD_TESTS )
add_subdirectory( gtest )
endif()
# ****************************** Thirdparty yaml ***************************************
if ( MILVUS_WITH_YAMLCPP )
add_subdirectory( yaml-cpp )

View File

@ -1,64 +0,0 @@
#-------------------------------------------------------------------------------
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
#-------------------------------------------------------------------------------
if ( DEFINED ENV{MILVUS_GTEST_URL} )
set( GTEST_SOURCE_URL "$ENV{MILVUS_GTEST_URL}" )
else()
set( GTEST_SOURCE_URL "https://gitee.com/quicksilver/googletest/repository/archive/release-${GTEST_VERSION}.zip" )
endif()
message( STATUS "Building gtest-${GTEST_VERSION} from source" )
set( CMAKE_POLICY_DEFAULT_CMP0022 NEW ) # for googletest only
FetchContent_Declare(
googletest
URL ${GTEST_SOURCE_URL}
URL_MD5 "f9137c5bc18b7d74027936f0f1bfa5c8"
SOURCE_DIR ${CMAKE_CURRENT_BINARY_DIR}/googletest-src
BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}/googletest-build
DOWNLOAD_DIR ${THIRDPARTY_DOWNLOAD_PATH} )
FetchContent_GetProperties( googletest )
if ( NOT googletest_POPULATED )
FetchContent_Populate( googletest )
# Adding the following targets:
# gtest, gtest_main, gmock, gmock_main
add_subdirectory( ${googletest_SOURCE_DIR}
${googletest_BINARY_DIR}
EXCLUDE_FROM_ALL )
endif()
# ****************************************************************
# Create ALIAS Target
# ****************************************************************
# if (NOT TARGET GTest:gtest)
# add_library( GTest::gtest ALIAS gtest )
# endif()
# if (NOT TARGET GTest:main)
# add_library( GTest::main ALIAS gtest_main )
# endif()
# if (NOT TARGET GMock:gmock)
# target_link_libraries( gmock INTERFACE GTest::gtest )
# add_library( GMock::gmock ALIAS gmock )
# endif()
# if (NOT TARGET GMock:main)
# target_link_libraries( gmock_main INTERFACE GTest::gtest )
# add_library( GMock::main ALIAS gmock_main )
# endif()
get_property( var DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}/googletest-src" PROPERTY COMPILE_OPTIONS )
message( STATUS "gtest compile options: ${var}" )

View File

@ -31,7 +31,6 @@ FetchContent_Declare(
)
set( protobuf_BUILD_TESTS CACHE BOOL OFF FORCE )
if ( NOT protobuf_POPULATED )
FetchContent_Populate( protobuf )

View File

@ -1,7 +1,8 @@
enable_testing()
find_package(GTest REQUIRED)
include_directories(${CMAKE_HOME_DIRECTORY}/src)
include_directories(${CMAKE_HOME_DIRECTORY}/src/index/knowhere)
include_directories(>>>> ${CMAKE_HOME_DIRECTORY}/src/index/knowhere)
set(MILVUS_TEST_FILES
test_naive.cpp
test_dog_segment.cpp
@ -21,4 +22,4 @@ target_link_libraries(all_tests
log
pthread
)
install (TARGETS all_tests DESTINATION unittest)
install (TARGETS all_tests DESTINATION unittest)

View File

@ -1,34 +1,52 @@
package proxy
import (
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
pb "github.com/zilliztech/milvus-distributed/internal/proto/message"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
//pb "github.com/zilliztech/milvus-distributed/internal/proto/message"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
pb "github.com/zilliztech/milvus-distributed/internal/proto/message"
"log"
"sync"
)
type manipulationReq struct {
pb.ManipulationReqMsg
stats []commonpb.Status
msgs []*pb.ManipulationReqMsg
wg sync.WaitGroup
proxy *proxyServer
}
// TsMsg interfaces
func (req *manipulationReq) Ts() Timestamp {
return Timestamp(req.Timestamp)
func (req *manipulationReq) Ts() (Timestamp, error) {
if req.msgs == nil {
return 0, errors.New("No typed manipulation request message in ")
}
return Timestamp(req.msgs[0].Timestamp), nil
}
func (req *manipulationReq) SetTs(ts Timestamp) {
req.Timestamp = uint64(ts)
for _, msg := range req.msgs {
msg.Timestamp = uint64(ts)
}
}
// BaseRequest interfaces
func (req *manipulationReq) Type() pb.ReqType {
return req.ReqType
if req.msgs == nil {
return 0
}
return req.msgs[0].ReqType
}
// TODO: use a ProcessReq function to wrap details?
// like func (req *manipulationReq) ProcessReq() commonpb.Status{
// req.PreExecute()
// req.Execute()
// req.PostExecute()
// req.WaitToFinish()
//}
func (req *manipulationReq) PreExecute() commonpb.Status {
return commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}
}
@ -43,13 +61,28 @@ func (req *manipulationReq) PostExecute() commonpb.Status { // send into pulsar
return commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}
}
func (req *manipulationReq) WaitToFinish() commonpb.Status { // wait unitl send into pulsar
func (req *manipulationReq) WaitToFinish() commonpb.Status { // wait until send into pulsar
req.wg.Wait()
return commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}
for _, stat := range req.stats{
if stat.ErrorCode != commonpb.ErrorCode_SUCCESS{
return stat
}
}
// update timestamp if necessary
ts, _ := req.Ts()
req.proxy.reqSch.mTimestampMux.Lock()
defer req.proxy.reqSch.mTimestampMux.Unlock()
if req.proxy.reqSch.mTimestamp <= ts {
req.proxy.reqSch.mTimestamp = ts
} else {
log.Printf("there is some wrong with m_timestamp, it goes back, current = %d, previous = %d", ts, req.proxy.reqSch.mTimestamp)
}
return req.stats[0]
}
func (s *proxyServer) restartManipulationRoutine(buf_size int) error {
s.reqSch.manipulationsChan = make(chan *manipulationReq, buf_size)
func (s *proxyServer) restartManipulationRoutine(bufSize int) error {
s.reqSch.manipulationsChan = make(chan *manipulationReq, bufSize)
pulsarClient, err := pulsar.NewClient(pulsar.ClientOptions{URL: s.pulsarAddr})
if err != nil {
return err
@ -80,51 +113,58 @@ func (s *proxyServer) restartManipulationRoutine(buf_size int) error {
case ip := <-s.reqSch.manipulationsChan:
ts, st := s.getTimestamp(1)
if st.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Printf("get time stamp failed, error code = %d, msg = %s, drop inset rows = %d", st.ErrorCode, st.Reason, len(ip.RowsData))
continue
}
mq := pb.ManipulationReqMsg{
CollectionName: ip.CollectionName,
PartitionTag: ip.PartitionTag,
PrimaryKeys: ip.PrimaryKeys,
RowsData: ip.RowsData,
Timestamp: uint64(ts[0]),
SegmentId: ip.SegmentId,
ChannelId: ip.ChannelId,
ReqType: ip.ReqType,
ProxyId: ip.ProxyId,
ExtraParams: ip.ExtraParams,
}
mb, err := proto.Marshal(&mq)
if err != nil {
log.Printf("Marshal ManipulationReqMsg failed, error = %v", err)
continue
}
switch ip.ReqType {
case pb.ReqType_kInsert:
if _, err := readers[mq.ChannelId].Send(s.ctx, &pulsar.ProducerMessage{Payload: mb}); err != nil {
log.Printf("post into puslar failed, error = %v", err)
}
break
case pb.ReqType_kDeleteEntityByID:
if _, err = deleter.Send(s.ctx, &pulsar.ProducerMessage{Payload: mb}); err != nil {
log.Printf("post into pulsar filed, error = %v", err)
}
default:
log.Printf("post unexpect ReqType = %d", ip.ReqType)
log.Printf("get time stamp failed, error code = %d, msg = %s", st.ErrorCode, st.Reason)
ip.stats[0] = st
ip.wg.Done()
break
}
s.reqSch.m_timestamp_mux.Lock()
if s.reqSch.m_timestamp <= ts[0] {
s.reqSch.m_timestamp = ts[0]
} else {
log.Printf("there is some wrong with m_timestamp, it goes back, current = %d, previous = %d", ts[0], s.reqSch.m_timestamp)
ip.SetTs(ts[0])
wg := sync.WaitGroup{}
for i, mq := range ip.msgs {
mq := mq
i := i
wg.Add(1)
go func() {
defer wg.Done()
mb, err := proto.Marshal(mq)
if err != nil {
log.Printf("Marshal ManipulationReqMsg failed, error = %v", err)
ip.stats[i] = commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("Marshal ManipulationReqMsg failed, error=%v", err),
}
return
}
switch ip.Type() {
case pb.ReqType_kInsert:
if _, err := readers[mq.ChannelId].Send(s.ctx, &pulsar.ProducerMessage{Payload: mb}); err != nil {
log.Printf("post into puslar failed, error = %v", err)
ip.stats[i] = commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("Post into puslar failed, error=%v", err.Error()),
}
return
}
case pb.ReqType_kDeleteEntityByID:
if _, err = deleter.Send(s.ctx, &pulsar.ProducerMessage{Payload: mb}); err != nil {
log.Printf("post into pulsar filed, error = %v", err)
ip.stats[i] = commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("Post into puslar failed, error=%v", err.Error()),
}
return
}
default:
log.Printf("post unexpect ReqType = %d", ip.Type())
return
}
}()
}
s.reqSch.m_timestamp_mux.Unlock()
wg.Wait()
ip.wg.Done()
break
}
}
}()

View File

@ -0,0 +1,23 @@
package mock
import (
"context"
"sync"
"time"
)
const timeWindow = time.Second
type TSOClient struct {
lastTs uint64
mux sync.Mutex
}
// window is 1000ms default
func (c *TSOClient) GetTimeStamp(ctx context.Context, n uint64) (ts uint64, count uint64, window time.Duration, err error) {
c.mux.Lock()
defer c.mux.Unlock()
ts = c.lastTs
c.lastTs += n
return ts, n, timeWindow, nil
}

View File

@ -5,9 +5,8 @@ import (
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
etcd "go.etcd.io/etcd/clientv3"
"strconv"
)
@ -55,6 +54,7 @@ type ProxyOptions struct {
func ReadProxyOptionsFromConfig() (*ProxyOptions, error) {
conf.LoadConfig("config.yaml")
etcdRootPath := conf.Config.Etcd.Rootpath
if etcdRootPath[len(etcdRootPath)-1] == '/' {
etcdRootPath = etcdRootPath[0 : len(etcdRootPath)-1]
@ -127,21 +127,11 @@ func StartProxy(opt *ProxyOptions) error {
ctx: opt.ctx,
}
//errChan := make(chan error, 1)
//go func() {
// err := startProxyServer(srv)
// errChan <- err
//}()
err = startProxyServer(srv)
if err != nil {
return err
}
//wait unit grpc server has started
//if err := <-errChan; err != nil {
// return err
//}
////////////////////////// time tick /////////////////////////////////
ttClient, err := pulsar.NewClient(pulsar.ClientOptions{URL: opt.pulsarAddr})
if err != nil {

View File

@ -89,13 +89,13 @@ func (s *proxyServer) restartQueryRoutine(buf_size int) error {
if _, err := query.Send(s.ctx, &pulsar.ProducerMessage{Payload: qb}); err != nil {
log.Printf("post into puslar failed, error = %v", err)
}
s.reqSch.q_timestamp_mux.Lock()
if s.reqSch.q_timestamp <= ts[0] {
s.reqSch.q_timestamp = ts[0]
s.reqSch.qTimestampMux.Lock()
if s.reqSch.qTimestamp <= ts[0] {
s.reqSch.qTimestamp = ts[0]
} else {
log.Printf("there is some wrong with q_timestamp, it goes back, current = %d, previous = %d", ts[0], s.reqSch.q_timestamp)
log.Printf("there is some wrong with q_timestamp, it goes back, current = %d, previous = %d", ts[0], s.reqSch.qTimestamp)
}
s.reqSch.q_timestamp_mux.Unlock()
s.reqSch.qTimestampMux.Unlock()
resultMap[qm.ReqId] = qm
//log.Printf("start search, query id = %d", qm.QueryId)
case cm, ok := <-result.Chan():

View File

@ -7,13 +7,13 @@ type requestScheduler struct {
//manipulations requestQueue
manipulationsChan chan *manipulationReq // manipulation queue
m_timestamp Timestamp
m_timestamp_mux sync.Mutex
mTimestamp Timestamp
mTimestampMux sync.Mutex
//queries requestQueue
queryChan chan *queryReq
q_timestamp Timestamp
q_timestamp_mux sync.Mutex
queryChan chan *queryReq
qTimestamp Timestamp
qTimestampMux sync.Mutex
}
// @param selection
@ -26,9 +26,9 @@ func (rs *requestScheduler) AreRequestsDelivered(ts Timestamp, selection uint32)
if selection&uint32(2) == 0 {
return true
}
rs.m_timestamp_mux.Lock()
defer rs.m_timestamp_mux.Unlock()
if rs.m_timestamp >= ts {
rs.mTimestampMux.Lock()
defer rs.mTimestampMux.Unlock()
if rs.mTimestamp >= ts {
return true
}
if len(rs.manipulationsChan) == 0 {
@ -41,9 +41,9 @@ func (rs *requestScheduler) AreRequestsDelivered(ts Timestamp, selection uint32)
if selection&uint32(4) == 0 {
return true
}
rs.q_timestamp_mux.Lock()
defer rs.q_timestamp_mux.Unlock()
if rs.q_timestamp >= ts {
rs.qTimestampMux.Lock()
defer rs.qTimestampMux.Unlock()
if rs.qTimestamp >= ts {
return true
}
if len(rs.queryChan) == 0 {

View File

@ -146,28 +146,30 @@ func (s *proxyServer) ShowPartitions(ctx context.Context, req *servicepb.Collect
func (s *proxyServer) DeleteByID(ctx context.Context, req *pb.DeleteByIDParam) (*commonpb.Status, error) {
log.Printf("delete entites, total = %d", len(req.IdArray))
pm := &manipulationReq{
ManipulationReqMsg: pb.ManipulationReqMsg{
CollectionName: req.CollectionName,
ReqType: pb.ReqType_kDeleteEntityByID,
ProxyId: s.proxyId,
},
proxy: s,
mReqMsg := pb.ManipulationReqMsg{
CollectionName: req.CollectionName,
ReqType: pb.ReqType_kDeleteEntityByID,
ProxyId: s.proxyId,
}
for _, id := range req.IdArray {
pm.PrimaryKeys = append(pm.PrimaryKeys, uint64(id))
mReqMsg.PrimaryKeys = append(mReqMsg.PrimaryKeys, uint64(id))
}
if len(pm.PrimaryKeys) > 1 {
if st := pm.PreExecute(); st.ErrorCode != commonpb.ErrorCode_SUCCESS {
if len(mReqMsg.PrimaryKeys) > 1 {
mReq := &manipulationReq{
stats: make([]commonpb.Status, 1),
msgs: append([]*pb.ManipulationReqMsg{}, &mReqMsg),
proxy: s,
}
if st := mReq.PreExecute(); st.ErrorCode != commonpb.ErrorCode_SUCCESS {
return &st, nil
}
if st := pm.Execute(); st.ErrorCode != commonpb.ErrorCode_SUCCESS {
if st := mReq.Execute(); st.ErrorCode != commonpb.ErrorCode_SUCCESS {
return &st, nil
}
if st := pm.PostExecute(); st.ErrorCode != commonpb.ErrorCode_SUCCESS {
if st := mReq.PostExecute(); st.ErrorCode != commonpb.ErrorCode_SUCCESS {
return &st, nil
}
if st := pm.WaitToFinish(); st.ErrorCode != commonpb.ErrorCode_SUCCESS {
if st := mReq.WaitToFinish(); st.ErrorCode != commonpb.ErrorCode_SUCCESS {
return &st, nil
}
}
@ -176,7 +178,7 @@ func (s *proxyServer) DeleteByID(ctx context.Context, req *pb.DeleteByIDParam) (
func (s *proxyServer) Insert(ctx context.Context, req *servicepb.RowBatch) (*servicepb.IntegerRangeResponse, error) {
log.Printf("Insert Entities, total = %d", len(req.RowData))
ipm := make(map[uint32]*manipulationReq)
msgMap := make(map[uint32]*pb.ManipulationReqMsg)
//TODO check collection schema's auto_id
if len(req.RowData) == 0 { //primary key is empty, set primary key by server
@ -198,50 +200,55 @@ func (s *proxyServer) Insert(ctx context.Context, req *servicepb.RowBatch) (*ser
return nil, status.Errorf(codes.Unknown, "hash failed on %d", key)
}
hash = hash % uint32(len(s.readerTopics))
ip, ok := ipm[hash]
ipm, ok := msgMap[hash]
if !ok {
segId, err := s.getSegmentId(int32(hash), req.CollectionName)
if err != nil {
return nil, err
}
ipm[hash] = &manipulationReq{
ManipulationReqMsg: pb.ManipulationReqMsg{
CollectionName: req.CollectionName,
PartitionTag: req.PartitionTag,
SegmentId: segId,
ChannelId: uint64(hash),
ReqType: pb.ReqType_kInsert,
ProxyId: s.proxyId,
//ExtraParams: req.ExtraParams,
},
proxy: s,
msgMap[hash] = &pb.ManipulationReqMsg{
CollectionName: req.CollectionName,
PartitionTag: req.PartitionTag,
SegmentId: segId,
ChannelId: uint64(hash),
ReqType: pb.ReqType_kInsert,
ProxyId: s.proxyId,
//ExtraParams: req.ExtraParams,
}
ip = ipm[hash]
ipm = msgMap[hash]
}
ip.PrimaryKeys = append(ip.PrimaryKeys, key)
ip.RowsData = append(ip.RowsData, &pb.RowData{Blob: req.RowData[i].Value}) // czs_tag
ipm.PrimaryKeys = append(ipm.PrimaryKeys, key)
ipm.RowsData = append(ipm.RowsData, &pb.RowData{Blob: req.RowData[i].Value}) // czs_tag
}
for _, ip := range ipm {
if st := ip.PreExecute(); st.ErrorCode != commonpb.ErrorCode_SUCCESS { //do nothing
return &servicepb.IntegerRangeResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR},
}, nil
}
if st := ip.Execute(); st.ErrorCode != commonpb.ErrorCode_SUCCESS { // push into chan
return &servicepb.IntegerRangeResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR},
}, nil
}
if st := ip.PostExecute(); st.ErrorCode != commonpb.ErrorCode_SUCCESS { //post to pulsar
return &servicepb.IntegerRangeResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR},
}, nil
}
// TODO: alloc manipulation request id
mReq := manipulationReq{
stats: make([]commonpb.Status, len(msgMap)),
msgs: make([]*pb.ManipulationReqMsg, len(msgMap)),
wg: sync.WaitGroup{},
proxy: s,
}
for _, ip := range ipm {
if st := ip.WaitToFinish(); st.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Printf("Wait to finish failed, error code = %d", st.ErrorCode)
}
for _, v := range msgMap {
mReq.msgs = append(mReq.msgs, v)
}
if st := mReq.PreExecute(); st.ErrorCode != commonpb.ErrorCode_SUCCESS { //do nothing
return &servicepb.IntegerRangeResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR},
}, nil
}
if st := mReq.Execute(); st.ErrorCode != commonpb.ErrorCode_SUCCESS { // push into chan
return &servicepb.IntegerRangeResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR},
}, nil
}
if st := mReq.PostExecute(); st.ErrorCode != commonpb.ErrorCode_SUCCESS { //post to pulsar
return &servicepb.IntegerRangeResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR},
}, nil
}
if st := mReq.WaitToFinish(); st.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Printf("Wait to finish failed, error code = %d", st.ErrorCode)
}
return &servicepb.IntegerRangeResponse{

View File

@ -370,8 +370,8 @@ func TestProxyServer_InsertAndDelete(t *testing.T) {
for i := 0; i < 5; i++ {
assert.Equalf(t, primaryKey[i], uint64(i+1), "insert failed")
}
t.Logf("m_timestamp = %d", ps.reqSch.m_timestamp)
assert.Equalf(t, ps.reqSch.m_timestamp, Timestamp(1300), "insert failed")
t.Logf("m_timestamp = %d", ps.reqSch.mTimestamp)
assert.Equalf(t, ps.reqSch.mTimestamp, Timestamp(1300), "insert failed")
}
func TestProxyServer_Search(t *testing.T) {

125
internal/proxy/tso.go Normal file
View File

@ -0,0 +1,125 @@
package proxy
import (
"context"
"fmt"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proxy/mock"
"log"
"sync"
"time"
)
// tsCountPerRPC is the count of timestamp requested from master per RPC
const tsCountPerRPC = 2 << 18 * 10
// defaultUpdateInterval is the interval between requesting a batch timestamps from master
const defaultUpdateInterval = time.Millisecond * 1000
// Oracle is the interface that provides strictly ascending timestamps.
type Oracle interface {
GetTimestamp(ctx context.Context, count uint32) (uint64, error)
Close()
}
type tsWithTTL struct {
ts uint64
count uint64
expireTime time.Time
}
func (ts *tsWithTTL) IsExpired() bool {
now := time.Now()
return now.Sub(ts.expireTime) >= 0
}
func (ts *tsWithTTL) CanAllocTs(count uint32) bool {
return !ts.IsExpired() && ts.count >= uint64(count)
}
// MatserOracle implement Oracle interface, proving strictly ascending timestamps.
// It request and cache a batch timestamps from master, and updating periodically.
type MatserOracle struct {
c *mock.TSOClient
lastTs *tsWithTTL
quit chan struct{}
mux sync.RWMutex
}
func NewMasterTSO(client *mock.TSOClient) (Oracle, error) {
o := &MatserOracle{
c: client,
lastTs: &tsWithTTL{
ts: 0,
count: 0,
expireTime: time.Time{},
},
quit: make(chan struct{}),
}
go o.UpdateLastTs(defaultUpdateInterval)
return o, nil
}
func (o *MatserOracle) UpdateLastTs(updateInterval time.Duration) {
tick := time.NewTicker(updateInterval)
defer tick.Stop()
for {
select {
case <-tick.C:
// Update ts
ctx := context.TODO()
ts, count, tw, err := o.c.GetTimeStamp(ctx, tsCountPerRPC)
if err != nil {
break
} else {
o.SetTs(ts, count, tw)
}
case <-o.quit:
return
}
}
}
func (o *MatserOracle) SetTs(ts uint64, count uint64, timeWindow time.Duration) {
o.mux.Lock()
defer o.mux.Unlock()
if ts > o.lastTs.ts || o.lastTs.ts == 0 {
o.lastTs.ts = ts
o.lastTs.count = count
o.lastTs.expireTime = time.Now().Add(timeWindow)
}
}
func (o *MatserOracle) GetTimestamp(ctx context.Context, count uint32) (uint64, error) {
// TODO: add context deadline
if count > tsCountPerRPC {
return 0, errors.New("Can't alloc too large count timestamps, count must less than " + fmt.Sprintf("%v", tsCountPerRPC))
}
maxRetry := 10
for i := 0; i < maxRetry; i++ {
o.mux.RLock()
retry := !o.lastTs.CanAllocTs(count)
o.mux.RUnlock()
if retry {
// wait for timestamp updated
log.Printf("MasterOracle GetTimeStamp, retry count: %v", i+1)
time.Sleep(time.Millisecond * 100)
continue
}
break
}
o.mux.Lock()
defer o.mux.Unlock()
// TimeStamp has not been update while retry `maxRetry` times
if !o.lastTs.CanAllocTs(count) {
return 0, errors.New("MasterOracle GetTimeStamp failed, exceeds max retry times")
}
ts := o.lastTs.ts
o.lastTs.ts += uint64(count)
o.lastTs.count -= uint64(count)
return ts, nil
}
func (o *MatserOracle) Close() {
close(o.quit)
}

View File

@ -0,0 +1,36 @@
package proxy
import (
"context"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/proxy/mock"
"testing"
"time"
)
func TestMatserOracle_GetTimestamp(t *testing.T) {
tso, _:= NewMasterTSO(&mock.TSOClient{})
defer tso.Close()
ctx := context.TODO()
ts0, err := tso.GetTimestamp(ctx, 100)
assert.Nil(t, err)
ts1, err := tso.GetTimestamp(ctx, 100)
t.Logf("ts0=%v, ts1=%v", ts0, ts1)
assert.Nil(t, err)
assert.Greater(t, ts1, ts0)
assert.Greater(t, ts1, ts0 + 99)
time.Sleep(time.Second * 3)
ts0, err = tso.GetTimestamp(ctx, 100)
assert.Nil(t, err)
ts1, err = tso.GetTimestamp(ctx, 100)
t.Logf("ts0=%v, ts1=%v", ts0, ts1)
assert.Nil(t, err)
assert.Greater(t, ts1, ts0)
assert.Greater(t, ts1, ts0 + 99)
_, err = tso.GetTimestamp(ctx, 2<<30)
assert.NotNil(t, err)
t.Log(err)
}

View File

@ -19,7 +19,7 @@ CPP_BUILD_DIR="${CPP_SRC_DIR}/cmake_build"
BUILD_OUTPUT_DIR=${CPP_BUILD_DIR}
BUILD_TYPE="Release"
BUILD_UNITTEST="OFF"
INSTALL_PREFIX="${CPP_SRC_DIR}/output"
INSTALL_PREFIX="${CPP_SRC_DIR}/milvus"
MAKE_CLEAN="OFF"
BUILD_COVERAGE="OFF"
DB_PATH="/tmp/milvus"