refine complie configuration (#17502)

Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>
pull/17713/head
Enwei Jiao 2022-06-24 21:12:15 +08:00 committed by GitHub
parent d512b5f98b
commit 16c3aedc15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
57 changed files with 1461 additions and 1872 deletions

2
.gitignore vendored
View File

@ -11,7 +11,6 @@
internal/core/output/*
internal/core/build/*
internal/kv/rocksdb/cwrapper/output/*
**/.idea/*
internal/msgstream/pulsarms/client-cpp/build/
internal/msgstream/pulsarms/client-cpp/build/*
@ -56,7 +55,6 @@ lib/
.DS_Store
*.sw[po]
cwrapper_build
**/cwrapper_rocksdb_build/
**/.clangd/*
**/compile_commands.json
**/.lint

View File

@ -19,20 +19,10 @@ LIBRARY_PATH := $(PWD)/lib
OS := $(shell uname -s)
ARCH := $(shell arch)
mode = Release
PKG_CONFIG = $(shell echo "${PKG_CONFIG_PATH}:`pwd`/internal/core/output/lib/pkgconfig:`pwd`/internal/core/output/lib64/pkgconfig")
all: build-cpp build-go
pre-proc:
@echo "Running pre-processing"
ifeq ($(OS),Darwin) # MacOS X
@echo "MacOS system identified. Switching to customized gorocksdb fork..."
@go mod edit -replace=github.com/tecbot/gorocksdb=github.com/soothing-rain/gorocksdb@v0.0.1
endif
ifeq ($(MSYSTEM), MINGW64) # MSYS
@echo "MSYS. Switching to customized gorocksdb fork..."
@go mod edit -replace=github.com/tecbot/gorocksdb=github.com/soothing-rain/gorocksdb@v0.0.1
endif
get-build-deps:
@(env bash $(PWD)/scripts/install_deps.sh)
@ -76,8 +66,8 @@ lint: tools/bin/revive
static-check:
@echo "Running $@ check"
@GO111MODULE=on ${GOPATH}/bin/golangci-lint cache clean
@GO111MODULE=on ${GOPATH}/bin/golangci-lint run --timeout=30m --config ./.golangci.yml ./internal/...
@GO111MODULE=on ${GOPATH}/bin/golangci-lint run --timeout=30m --config ./.golangci.yml ./cmd/...
@GO111MODULE=on PKG_CONFIG_PATH="$(PKG_CONFIG)" ${GOPATH}/bin/golangci-lint run --timeout=30m --config ./.golangci.yml ./internal/...
@GO111MODULE=on PKG_CONFIG_PATH="$(PKG_CONFIG)" ${GOPATH}/bin/golangci-lint run --timeout=30m --config ./.golangci.yml ./cmd/...
# @GO111MODULE=on ${GOPATH}/bin/golangci-lint run --timeout=30m --config ./.golangci.yml ./tests/go_client/...
verifiers: build-cpp getdeps cppcheck fmt static-check
@ -106,42 +96,37 @@ print-build-info:
milvus: build-cpp print-build-info
@echo "Building Milvus ..."
@echo "if build fails on Mac M1 machines, you probably need to rerun scripts/install_deps.sh and then run: \`export PKG_CONFIG_PATH=\"/opt/homebrew/opt/openssl@3/lib/pkgconfig\"\`"
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build \
-ldflags="-X '$(OBJPREFIX).BuildTags=$(BUILD_TAGS)' -X '$(OBJPREFIX).BuildTime=$(BUILD_TIME)' -X '$(OBJPREFIX).GitCommit=$(GIT_COMMIT)' -X '$(OBJPREFIX).GoVersion=$(GO_VERSION)'" \
${APPLE_SILICON_FLAG} -o $(INSTALL_PATH)/milvus $(PWD)/cmd/main.go 1>/dev/null
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && \
export PKG_CONFIG_PATH="$(PKG_CONFIG)" && \
export RPATH=$$(pkg-config --libs-only-L milvus_common | cut -c 3-) && \
GO111MODULE=on $(GO) build -ldflags="-r $${RPATH} -X '$(OBJPREFIX).BuildTags=$(BUILD_TAGS)' -X '$(OBJPREFIX).BuildTime=$(BUILD_TIME)' -X '$(OBJPREFIX).GitCommit=$(GIT_COMMIT)' -X '$(OBJPREFIX).GoVersion=$(GO_VERSION)'" \
${APPLE_SILICON_FLAG} -o $(INSTALL_PATH)/milvus $(PWD)/cmd/main.go 1>/dev/null
embd-milvus: build-cpp-embd print-build-info
@echo "Building **Embedded** Milvus ..."
@echo "if build fails on Mac M1 machines, rerun scripts/install_deps.sh and then run: \`export PKG_CONFIG_PATH=\"/opt/homebrew/opt/openssl@3/lib/pkgconfig\"\`"
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build \
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && \
export PKG_CONFIG_PATH="$(PKG_CONFIG)" && GO111MODULE=on $(GO) build \
-ldflags="-r /tmp/milvus/lib/ -X '$(OBJPREFIX).BuildTags=$(BUILD_TAGS)' -X '$(OBJPREFIX).BuildTime=$(BUILD_TIME)' -X '$(OBJPREFIX).GitCommit=$(GIT_COMMIT)' -X '$(OBJPREFIX).GoVersion=$(GO_VERSION)'" \
${APPLE_SILICON_FLAG} -buildmode=c-shared -o $(INSTALL_PATH)/embd-milvus.so $(PWD)/pkg/embedded/embedded.go 1>/dev/null
build-go: milvus
build-cpp: pre-proc
build-cpp:
@echo "Building Milvus cpp library ..."
@(env bash $(PWD)/scripts/core_build.sh -t ${mode} -f "$(CUSTOM_THIRDPARTY_PATH)")
@(env bash $(PWD)/scripts/cwrapper_build.sh -t ${mode} -f "$(CUSTOM_THIRDPARTY_PATH)")
@(env bash $(PWD)/scripts/cwrapper_rocksdb_build.sh -t ${mode} -f "$(CUSTOM_THIRDPARTY_PATH)")
build-cpp-embd: pre-proc
build-cpp-embd:
@echo "Building **Embedded** Milvus cpp library ..."
@(env bash $(PWD)/scripts/core_build.sh -b -t ${mode} -f "$(CUSTOM_THIRDPARTY_PATH)")
@(env bash $(PWD)/scripts/cwrapper_build.sh -b -t ${mode} -f "$(CUSTOM_THIRDPARTY_PATH)")
@(env bash $(PWD)/scripts/cwrapper_rocksdb_build.sh -b -t ${mode} -f "$(CUSTOM_THIRDPARTY_PATH)")
build-cpp-with-unittest: pre-proc
build-cpp-with-unittest:
@echo "Building Milvus cpp library with unittest ..."
@(env bash $(PWD)/scripts/core_build.sh -t ${mode} -u -f "$(CUSTOM_THIRDPARTY_PATH)")
@(env bash $(PWD)/scripts/cwrapper_build.sh -t ${mode} -f "$(CUSTOM_THIRDPARTY_PATH)")
@(env bash $(PWD)/scripts/cwrapper_rocksdb_build.sh -t ${mode} -f "$(CUSTOM_THIRDPARTY_PATH)")
build-cpp-with-coverage: pre-proc
build-cpp-with-coverage:
@echo "Building Milvus cpp library with coverage and unittest ..."
@(env bash $(PWD)/scripts/core_build.sh -t ${mode} -u -c -f "$(CUSTOM_THIRDPARTY_PATH)")
@(env bash $(PWD)/scripts/cwrapper_build.sh -t ${mode} -f "$(CUSTOM_THIRDPARTY_PATH)")
@(env bash $(PWD)/scripts/cwrapper_rocksdb_build.sh -t ${mode} -f "$(CUSTOM_THIRDPARTY_PATH)")
# Run the tests.
@ -208,7 +193,7 @@ docker: install
install: all
@echo "Installing binary to './bin'"
@mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/milvus $(GOPATH)/bin/milvus
@mkdir -p $(LIBRARY_PATH) && cp -r -P $(PWD)/internal/core/output/lib/* $(LIBRARY_PATH)
@mkdir -p $(LIBRARY_PATH) && cp -r -P $(PWD)/internal/core/output/lib/*.so* $(LIBRARY_PATH)
@echo "Installation successful."
clean:
@ -219,11 +204,8 @@ clean:
@rm -rf lib/
@rm -rf $(GOPATH)/bin/milvus
@rm -rf cmake_build
@rm -rf cwrapper_rocksdb_build
@rm -rf cwrapper_build
@rm -rf internal/storage/cwrapper/output
@rm -rf internal/core/output
@rm -rf internal/kv/rocksdb/cwrapper/output
milvus-tools: print-build-info
@echo "Building tools ..."
@ -231,7 +213,7 @@ milvus-tools: print-build-info
-ldflags="-X 'main.BuildTags=$(BUILD_TAGS)' -X 'main.BuildTime=$(BUILD_TIME)' -X 'main.GitCommit=$(GIT_COMMIT)' -X 'main.GoVersion=$(GO_VERSION)'" \
-o $(INSTALL_PATH)/tools $(PWD)/cmd/tools/* 1>/dev/null
rpm-setup:
rpm-setup:
@echo "Setuping rpm env ...;"
@build/rpm/setup-env.sh

1
go.mod
View File

@ -68,4 +68,5 @@ replace (
github.com/dgrijalva/jwt-go => github.com/golang-jwt/jwt v3.2.2+incompatible // Fix security alert for jwt-go 3.2.0
github.com/go-kit/kit => github.com/go-kit/kit v0.1.0
github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1
github.com/tecbot/gorocksdb => github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b // indirect
)

2
go.sum
View File

@ -813,6 +813,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182aff
github.com/miekg/dns v1.0.14 h1:9jZdLNd/P4+SfEJ0TNyxYpsK8N4GtfylBLqtbYN1sbA=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/pkcs11 v1.0.3/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/pulsar-client-go v0.6.7 h1:0HTCNWZ9jM3PNdjteFSjkxu62IJts6JoUNa0RkxWzV8=
github.com/milvus-io/pulsar-client-go v0.6.7/go.mod h1:oFIlYIk23tamkSLttw849qphmMIpHY8ztEBWDWJW+sc=
github.com/milvus-io/pulsar-client-go v0.6.8 h1:fZdZH73aPRszu2fazyeeahQEz34tyn1Pt9EkqJmV100=

View File

@ -240,6 +240,13 @@ endif ()
add_custom_target( Clean-All COMMAND ${CMAKE_BUILD_TOOL} clean )
# **************************** Install ****************************
# Install storage
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/storage/
DESTINATION include/storage
FILES_MATCHING PATTERN "*_c.h"
)
# Install segcore
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/segcore/
DESTINATION include/segcore
@ -256,4 +263,4 @@ install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/indexbuilder/
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/common/
DESTINATION include/common
FILES_MATCHING PATTERN "*_c.h"
)
)

View File

@ -203,6 +203,12 @@ function(ADD_THIRDPARTY_LIB LIB_NAME)
endif()
endfunction()
function(MILVUS_ADD_PKG_CONFIG MODULE)
configure_file(${MODULE}.pc.in "${CMAKE_CURRENT_BINARY_DIR}/${MODULE}.pc" @ONLY)
install(FILES "${CMAKE_CURRENT_BINARY_DIR}/${MODULE}.pc"
DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")
endfunction()
MACRO (import_mysql_inc)
find_path (MYSQL_INCLUDE_DIR
NAMES "mysql.h"

View File

@ -28,3 +28,4 @@ add_subdirectory( common )
add_subdirectory( indexbuilder )
add_subdirectory( config )
add_subdirectory( index )
add_subdirectory( storage )

View File

@ -9,6 +9,8 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License
milvus_add_pkg_config("milvus_common")
set(COMMON_SRC
Schema.cpp
Types.cpp
@ -17,9 +19,7 @@ set(COMMON_SRC
memory_c.cpp
)
add_library(milvus_common SHARED
${COMMON_SRC}
)
add_library(milvus_common SHARED ${COMMON_SRC})
if ( MSYS )
target_link_libraries(milvus_common
@ -48,4 +48,4 @@ else()
)
endif()
install(TARGETS milvus_common DESTINATION lib)
install(TARGETS milvus_common DESTINATION "${CMAKE_INSTALL_LIBDIR}")

View File

@ -0,0 +1,9 @@
libdir=@CMAKE_INSTALL_FULL_LIBDIR@
includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@
Name: Milvus Common
Description: Common modules for Milvus
Version: @MILVUS_VERSION@
Libs: -L${libdir} -lmilvus_common
Cflags: -I${includedir}

View File

@ -29,4 +29,4 @@ target_link_libraries(milvus_index
${PLATFORM_LIBS}
)
install(TARGETS milvus_index DESTINATION lib)
install(TARGETS milvus_index DESTINATION "${CMAKE_INSTALL_LIBDIR}")

View File

@ -17,9 +17,9 @@ set(INDEXBUILDER_FILES
utils.cpp
ScalarIndexCreator.cpp
)
add_library(milvus_indexbuilder SHARED
${INDEXBUILDER_FILES}
)
milvus_add_pkg_config("milvus_indexbuilder")
add_library(milvus_indexbuilder SHARED ${INDEXBUILDER_FILES})
find_library(TBB NAMES tbb)
set(PLATFORM_LIBS dl)
@ -37,4 +37,4 @@ target_link_libraries(milvus_indexbuilder
pthread
)
install(TARGETS milvus_indexbuilder DESTINATION lib)
install(TARGETS milvus_indexbuilder DESTINATION "${CMAKE_INSTALL_LIBDIR}")

View File

@ -0,0 +1,9 @@
libdir=@CMAKE_INSTALL_FULL_LIBDIR@
includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@
Name: Milvus IndexBuilder
Description: IndexBuilder modules for Milvus
Version: @MILVUS_VERSION@
Libs: -L${libdir} -lmilvus_indexbuilder
Cflags: -I${includedir}

View File

@ -15,6 +15,8 @@ if ( EMBEDDED_MILVUS )
add_compile_definitions( EMBEDDED_MILVUS )
endif()
milvus_add_pkg_config("milvus_segcore")
set(SEGCORE_FILES
Collection.cpp
collection_c.cpp
@ -34,9 +36,7 @@ set(SEGCORE_FILES
TimestampIndex.cpp
Utils.cpp
ConcurrentVector.cpp)
add_library(milvus_segcore SHARED
${SEGCORE_FILES}
)
add_library(milvus_segcore SHARED ${SEGCORE_FILES})
find_library(TBB NAMES tbb)
set(PLATFORM_LIBS dl)
@ -54,4 +54,4 @@ target_link_libraries(milvus_segcore
# gperftools
)
install(TARGETS milvus_segcore DESTINATION lib)
install(TARGETS milvus_segcore DESTINATION "${CMAKE_INSTALL_LIBDIR}")

View File

@ -0,0 +1,9 @@
libdir=@CMAKE_INSTALL_FULL_LIBDIR@
includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@
Name: Milvus Segcore
Description: Segcore modules for Milvus
Version: @MILVUS_VERSION@
Libs: -L${libdir} -lmilvus_segcore
Cflags: -I${includedir}

View File

@ -14,19 +14,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
add_executable(wrapper_test
ParquetWrapperTest.cpp)
include(FetchContent)
FetchContent_Declare(googletest
URL "https://github.com/google/googletest/archive/release-1.10.0.tar.gz")
set(BUILD_GMOCK CACHE BOOL OFF)
set(INSTALL_GTEST CACHE BOOL OFF)
FetchContent_MakeAvailable(googletest)
option( EMBEDDED_MILVUS "Enable embedded Milvus" OFF )
target_link_libraries(wrapper_test PRIVATE
gtest_main
wrapper
)
if ( EMBEDDED_MILVUS )
add_compile_definitions( EMBEDDED_MILVUS )
endif()
install(TARGETS wrapper_test DESTINATION ${CMAKE_INSTALL_PREFIX})
milvus_add_pkg_config("milvus_storage")
set(STORAGE_FILES parquet_c.cpp PayloadStream.cpp)
add_library(milvus_storage SHARED ${STORAGE_FILES})
target_link_libraries( milvus_storage PUBLIC arrow parquet pthread)
if(NOT CMAKE_INSTALL_PREFIX)
set(CMAKE_INSTALL_PREFIX ${CMAKE_CURRENT_BINARY_DIR})
endif()
install(TARGETS milvus_storage DESTINATION "${CMAKE_INSTALL_LIBDIR}")

View File

@ -0,0 +1,59 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
enum ColumnType : int {
NONE = 0,
BOOL = 1,
INT8 = 2,
INT16 = 3,
INT32 = 4,
INT64 = 5,
FLOAT = 10,
DOUBLE = 11,
STRING = 20,
VARCHAR = 21,
VECTOR_BINARY = 100,
VECTOR_FLOAT = 101
};
enum ErrorCode : int {
SUCCESS = 0,
UNEXPECTED_ERROR = 1,
CONNECT_FAILED = 2,
PERMISSION_DENIED = 3,
COLLECTION_NOT_EXISTS = 4,
ILLEGAL_ARGUMENT = 5,
ILLEGAL_DIMENSION = 7,
ILLEGAL_INDEX_TYPE = 8,
ILLEGAL_COLLECTION_NAME = 9,
ILLEGAL_TOPK = 10,
ILLEGAL_ROWRECORD = 11,
ILLEGAL_VECTOR_ID = 12,
ILLEGAL_SEARCH_RESULT = 13,
FILE_NOT_FOUND = 14,
META_FAILED = 15,
CACHE_FAILED = 16,
CANNOT_CREATE_FOLDER = 17,
CANNOT_CREATE_FILE = 18,
CANNOT_DELETE_FOLDER = 19,
CANNOT_DELETE_FILE = 20,
BUILD_INDEX_ERROR = 21,
ILLEGAL_NLIST = 22,
ILLEGAL_METRIC_TYPE = 23,
OUT_OF_MEMORY = 24,
DD_REQUEST_RACE = 1000
};

View File

@ -0,0 +1,121 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "PayloadStream.h"
namespace wrapper {
PayloadOutputStream::PayloadOutputStream() {
buffer_.reserve(1024 * 1024);
closed_ = false;
}
PayloadOutputStream::~PayloadOutputStream() noexcept {
}
arrow::Status
PayloadOutputStream::Close() {
closed_ = true;
return arrow::Status::OK();
}
arrow::Result<int64_t>
PayloadOutputStream::Tell() const {
return arrow::Result<int64_t>(buffer_.size());
}
bool
PayloadOutputStream::closed() const {
return closed_;
}
arrow::Status
PayloadOutputStream::Write(const void* data, int64_t nbytes) {
if (nbytes <= 0)
return arrow::Status::OK();
auto size = buffer_.size();
buffer_.resize(size + nbytes);
std::memcpy(buffer_.data() + size, data, nbytes);
return arrow::Status::OK();
}
arrow::Status
PayloadOutputStream::Flush() {
return arrow::Status::OK();
}
const std::vector<uint8_t>&
PayloadOutputStream::Buffer() const {
return buffer_;
}
PayloadInputStream::PayloadInputStream(const uint8_t* data, int64_t size)
: data_(data), size_(size), tell_(0), closed_(false) {
}
PayloadInputStream::~PayloadInputStream() noexcept {
}
arrow::Status
PayloadInputStream::Close() {
closed_ = true;
return arrow::Status::OK();
}
bool
PayloadInputStream::closed() const {
return closed_;
}
arrow::Result<int64_t>
PayloadInputStream::Tell() const {
return arrow::Result<int64_t>(tell_);
}
arrow::Status
PayloadInputStream::Seek(int64_t position) {
if (position < 0 || position >= size_)
return arrow::Status::IOError("invalid position");
tell_ = position;
return arrow::Status::OK();
}
arrow::Result<int64_t>
PayloadInputStream::Read(int64_t nbytes, void* out) {
auto remain = size_ - tell_;
if (nbytes > remain)
nbytes = remain;
std::memcpy(out, data_ + tell_, nbytes);
tell_ += nbytes;
return arrow::Result<int64_t>(nbytes);
}
arrow::Result<std::shared_ptr<arrow::Buffer>>
PayloadInputStream::Read(int64_t nbytes) {
auto remain = size_ - tell_;
if (nbytes > remain)
nbytes = remain;
auto buf = std::make_shared<arrow::Buffer>(data_ + tell_, nbytes);
tell_ += nbytes;
return arrow::Result<std::shared_ptr<arrow::Buffer>>(buf);
}
arrow::Result<int64_t>
PayloadInputStream::GetSize() {
return arrow::Result<int64_t>(size_);
}
} // namespace wrapper

View File

@ -0,0 +1,105 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <vector>
#include <memory>
#include <arrow/api.h>
#include <arrow/io/interfaces.h>
#include <parquet/arrow/writer.h>
#include <parquet/arrow/reader.h>
#include "ColumnType.h"
namespace wrapper {
class PayloadOutputStream;
class PayloadInputStream;
constexpr int EMPTY_DIMENSION = -1;
struct PayloadWriter {
ColumnType columnType;
int dimension; // binary vector, float vector
std::shared_ptr<arrow::ArrayBuilder> builder;
std::shared_ptr<arrow::Schema> schema;
std::shared_ptr<PayloadOutputStream> output;
int rows;
};
struct PayloadReader {
ColumnType column_type;
std::shared_ptr<PayloadInputStream> input;
std::unique_ptr<parquet::arrow::FileReader> reader;
std::shared_ptr<arrow::Table> table;
std::shared_ptr<arrow::ChunkedArray> column;
std::shared_ptr<arrow::Array> array;
bool* bValues;
};
class PayloadOutputStream : public arrow::io::OutputStream {
public:
PayloadOutputStream();
~PayloadOutputStream();
arrow::Status
Close() override;
arrow::Result<int64_t>
Tell() const override;
bool
closed() const override;
arrow::Status
Write(const void* data, int64_t nbytes) override;
arrow::Status
Flush() override;
public:
const std::vector<uint8_t>&
Buffer() const;
private:
std::vector<uint8_t> buffer_;
bool closed_;
};
class PayloadInputStream : public arrow::io::RandomAccessFile {
public:
PayloadInputStream(const uint8_t* data, int64_t size);
~PayloadInputStream();
arrow::Status
Close() override;
arrow::Result<int64_t>
Tell() const override;
bool
closed() const override;
arrow::Status
Seek(int64_t position) override;
arrow::Result<int64_t>
Read(int64_t nbytes, void* out) override;
arrow::Result<std::shared_ptr<arrow::Buffer>>
Read(int64_t nbytes) override;
arrow::Result<int64_t>
GetSize() override;
private:
const uint8_t* data_;
const int64_t size_;
int64_t tell_;
bool closed_;
};
} // namespace wrapper

View File

@ -0,0 +1,10 @@
libdir=@CMAKE_INSTALL_FULL_LIBDIR@
includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@
Name: Milvus Storage
Description: Storage wrapper for Milvus
Version: @MILVUS_VERSION@
Libs: -L${libdir} -lmilvus_storage
Cflags: -I${includedir}

View File

@ -0,0 +1,556 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "storage/parquet_c.h"
#include "storage/PayloadStream.h"
static const char*
ErrorMsg(const std::string& msg) {
if (msg.empty())
return nullptr;
auto ret = (char*)malloc(msg.size() + 1);
std::memcpy(ret, msg.c_str(), msg.size());
ret[msg.size()] = '\0';
return ret;
}
extern "C" CPayloadWriter
NewPayloadWriter(int columnType) {
auto p = new wrapper::PayloadWriter;
p->builder = nullptr;
p->schema = nullptr;
p->output = nullptr;
p->dimension = wrapper::EMPTY_DIMENSION;
p->rows = 0;
switch (static_cast<ColumnType>(columnType)) {
case ColumnType::BOOL: {
p->columnType = ColumnType::BOOL;
p->builder = std::make_shared<arrow::BooleanBuilder>();
p->schema = arrow::schema({arrow::field("val", arrow::boolean())});
break;
}
case ColumnType::INT8: {
p->columnType = ColumnType::INT8;
p->builder = std::make_shared<arrow::Int8Builder>();
p->schema = arrow::schema({arrow::field("val", arrow::int8())});
break;
}
case ColumnType::INT16: {
p->columnType = ColumnType::INT16;
p->builder = std::make_shared<arrow::Int16Builder>();
p->schema = arrow::schema({arrow::field("val", arrow::int16())});
break;
}
case ColumnType::INT32: {
p->columnType = ColumnType::INT32;
p->builder = std::make_shared<arrow::Int32Builder>();
p->schema = arrow::schema({arrow::field("val", arrow::int32())});
break;
}
case ColumnType::INT64: {
p->columnType = ColumnType::INT64;
p->builder = std::make_shared<arrow::Int64Builder>();
p->schema = arrow::schema({arrow::field("val", arrow::int64())});
break;
}
case ColumnType::FLOAT: {
p->columnType = ColumnType::FLOAT;
p->builder = std::make_shared<arrow::FloatBuilder>();
p->schema = arrow::schema({arrow::field("val", arrow::float32())});
break;
}
case ColumnType::DOUBLE: {
p->columnType = ColumnType::DOUBLE;
p->builder = std::make_shared<arrow::DoubleBuilder>();
p->schema = arrow::schema({arrow::field("val", arrow::float64())});
break;
}
case ColumnType::VARCHAR:
case ColumnType::STRING: {
p->columnType = ColumnType::STRING;
p->builder = std::make_shared<arrow::StringBuilder>();
p->schema = arrow::schema({arrow::field("val", arrow::utf8())});
break;
}
case ColumnType::VECTOR_BINARY: {
p->columnType = ColumnType::VECTOR_BINARY;
p->dimension = wrapper::EMPTY_DIMENSION;
break;
}
case ColumnType::VECTOR_FLOAT: {
p->columnType = ColumnType::VECTOR_FLOAT;
p->dimension = wrapper::EMPTY_DIMENSION;
break;
}
default: {
delete p;
return nullptr;
}
}
return reinterpret_cast<CPayloadWriter>(p);
}
template <typename DT, typename BT>
CStatus
AddValuesToPayload(CPayloadWriter payloadWriter, DT* values, int length) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
if (length <= 0)
return st;
auto p = reinterpret_cast<wrapper::PayloadWriter*>(payloadWriter);
auto builder = std::dynamic_pointer_cast<BT>(p->builder);
if (builder == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("incorrect data type");
return st;
}
if (p->output != nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("payload has finished");
return st;
}
auto ast = builder->AppendValues(values, values + length);
if (!ast.ok()) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg(ast.message());
return st;
}
p->rows += length;
return st;
}
extern "C" CStatus
AddBooleanToPayload(CPayloadWriter payloadWriter, bool* values, int length) {
return AddValuesToPayload<bool, arrow::BooleanBuilder>(payloadWriter, values, length);
}
extern "C" CStatus
AddInt8ToPayload(CPayloadWriter payloadWriter, int8_t* values, int length) {
return AddValuesToPayload<int8_t, arrow::Int8Builder>(payloadWriter, values, length);
}
extern "C" CStatus
AddInt16ToPayload(CPayloadWriter payloadWriter, int16_t* values, int length) {
return AddValuesToPayload<int16_t, arrow::Int16Builder>(payloadWriter, values, length);
}
extern "C" CStatus
AddInt32ToPayload(CPayloadWriter payloadWriter, int32_t* values, int length) {
return AddValuesToPayload<int32_t, arrow::Int32Builder>(payloadWriter, values, length);
}
extern "C" CStatus
AddInt64ToPayload(CPayloadWriter payloadWriter, int64_t* values, int length) {
return AddValuesToPayload<int64_t, arrow::Int64Builder>(payloadWriter, values, length);
}
extern "C" CStatus
AddFloatToPayload(CPayloadWriter payloadWriter, float* values, int length) {
return AddValuesToPayload<float, arrow::FloatBuilder>(payloadWriter, values, length);
}
extern "C" CStatus
AddDoubleToPayload(CPayloadWriter payloadWriter, double* values, int length) {
return AddValuesToPayload<double, arrow::DoubleBuilder>(payloadWriter, values, length);
}
extern "C" CStatus
AddOneStringToPayload(CPayloadWriter payloadWriter, char* cstr, int str_size) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
auto p = reinterpret_cast<wrapper::PayloadWriter*>(payloadWriter);
auto builder = std::dynamic_pointer_cast<arrow::StringBuilder>(p->builder);
if (builder == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("incorrect data type");
return st;
}
if (p->output != nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("payload has finished");
return st;
}
arrow::Status ast;
if (cstr == nullptr || str_size < 0) {
ast = builder->AppendNull();
} else {
ast = builder->Append(cstr, str_size);
}
if (!ast.ok()) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg(ast.message());
return st;
}
p->rows++;
return st;
}
extern "C" CStatus
AddBinaryVectorToPayload(CPayloadWriter payloadWriter, uint8_t* values, int dimension, int length) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
if (length <= 0)
return st;
auto p = reinterpret_cast<wrapper::PayloadWriter*>(payloadWriter);
if (p->dimension == wrapper::EMPTY_DIMENSION) {
if ((dimension % 8) || (dimension <= 0)) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("incorrect dimension value");
return st;
}
if (p->builder != nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("incorrect data type");
return st;
}
p->builder = std::make_shared<arrow::FixedSizeBinaryBuilder>(arrow::fixed_size_binary(dimension / 8));
p->schema = arrow::schema({arrow::field("val", arrow::fixed_size_binary(dimension / 8))});
p->dimension = dimension;
} else if (p->dimension != dimension) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("dimension changed");
return st;
}
auto builder = std::dynamic_pointer_cast<arrow::FixedSizeBinaryBuilder>(p->builder);
if (builder == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("incorrect data type");
return st;
}
if (p->output != nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("payload has finished");
return st;
}
auto ast = builder->AppendValues(values, length);
if (!ast.ok()) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg(ast.message());
return st;
}
p->rows += length;
return st;
}
extern "C" CStatus
AddFloatVectorToPayload(CPayloadWriter payloadWriter, float* values, int dimension, int length) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
if (length <= 0)
return st;
auto p = reinterpret_cast<wrapper::PayloadWriter*>(payloadWriter);
if (p->dimension == wrapper::EMPTY_DIMENSION) {
if (p->builder != nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("incorrect data type");
return st;
}
p->builder =
std::make_shared<arrow::FixedSizeBinaryBuilder>(arrow::fixed_size_binary(dimension * sizeof(float)));
p->schema = arrow::schema({arrow::field("val", arrow::fixed_size_binary(dimension * sizeof(float)))});
p->dimension = dimension;
} else if (p->dimension != dimension) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("dimension changed");
return st;
}
auto builder = std::dynamic_pointer_cast<arrow::FixedSizeBinaryBuilder>(p->builder);
if (builder == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("incorrect data type");
return st;
}
if (p->output != nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("payload has finished");
return st;
}
auto ast = builder->AppendValues(reinterpret_cast<const uint8_t*>(values), length);
if (!ast.ok()) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg(ast.message());
return st;
}
p->rows += length;
return st;
}
extern "C" CStatus
FinishPayloadWriter(CPayloadWriter payloadWriter) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
auto p = reinterpret_cast<wrapper::PayloadWriter*>(payloadWriter);
if (p->builder == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("arrow builder is nullptr");
return st;
}
if (p->output == nullptr) {
std::shared_ptr<arrow::Array> array;
auto ast = p->builder->Finish(&array);
if (!ast.ok()) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg(ast.message());
return st;
}
auto table = arrow::Table::Make(p->schema, {array});
p->output = std::make_shared<wrapper::PayloadOutputStream>();
auto mem_pool = arrow::default_memory_pool();
ast = parquet::arrow::WriteTable(
*table, mem_pool, p->output, 1024 * 1024 * 1024,
parquet::WriterProperties::Builder().compression(arrow::Compression::ZSTD)->compression_level(3)->build());
if (!ast.ok()) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg(ast.message());
return st;
}
}
return st;
}
CBuffer
GetPayloadBufferFromWriter(CPayloadWriter payloadWriter) {
CBuffer buf;
auto p = reinterpret_cast<wrapper::PayloadWriter*>(payloadWriter);
if (p->output == nullptr) {
buf.length = 0;
buf.data = nullptr;
return buf;
}
auto& output = p->output->Buffer();
buf.length = static_cast<int>(output.size());
buf.data = (char*)(output.data());
return buf;
}
int
GetPayloadLengthFromWriter(CPayloadWriter payloadWriter) {
auto p = reinterpret_cast<wrapper::PayloadWriter*>(payloadWriter);
return p->rows;
}
extern "C" void
ReleasePayloadWriter(CPayloadWriter handler) {
auto p = reinterpret_cast<wrapper::PayloadWriter*>(handler);
if (p != nullptr)
delete p;
arrow::default_memory_pool()->ReleaseUnused();
}
extern "C" CPayloadReader
NewPayloadReader(int columnType, uint8_t* buffer, int64_t buf_size) {
auto p = new wrapper::PayloadReader;
p->bValues = nullptr;
p->input = std::make_shared<wrapper::PayloadInputStream>(buffer, buf_size);
auto mem_pool = arrow::default_memory_pool();
auto st = parquet::arrow::OpenFile(p->input, mem_pool, &p->reader);
if (!st.ok()) {
delete p;
return nullptr;
}
st = p->reader->ReadTable(&p->table);
if (!st.ok()) {
delete p;
return nullptr;
}
p->column = p->table->column(0);
assert(p->column != nullptr);
assert(p->column->chunks().size() == 1);
p->array = p->column->chunk(0);
switch (columnType) {
case ColumnType::BOOL:
case ColumnType::INT8:
case ColumnType::INT16:
case ColumnType::INT32:
case ColumnType::INT64:
case ColumnType::FLOAT:
case ColumnType::DOUBLE:
case ColumnType::STRING:
case ColumnType::VARCHAR:
case ColumnType::VECTOR_BINARY:
case ColumnType::VECTOR_FLOAT: {
break;
}
default: {
delete p;
return nullptr;
}
}
return reinterpret_cast<CPayloadReader>(p);
}
extern "C" CStatus
GetBoolFromPayload(CPayloadReader payloadReader, bool** values, int* length) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
auto p = reinterpret_cast<wrapper::PayloadReader*>(payloadReader);
if (p->bValues == nullptr) {
auto array = std::dynamic_pointer_cast<arrow::BooleanArray>(p->array);
if (array == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("incorrect data type");
return st;
}
int len = array->length();
p->bValues = new bool[len];
for (int i = 0; i < len; i++) {
p->bValues[i] = array->Value(i);
}
}
*values = p->bValues;
*length = p->array->length();
return st;
}
template <typename DT, typename AT>
CStatus
GetValuesFromPayload(CPayloadReader payloadReader, DT** values, int* length) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
auto p = reinterpret_cast<wrapper::PayloadReader*>(payloadReader);
auto array = std::dynamic_pointer_cast<AT>(p->array);
if (array == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("incorrect data type");
return st;
}
*length = array->length();
*values = (DT*)array->raw_values();
return st;
}
extern "C" CStatus
GetInt8FromPayload(CPayloadReader payloadReader, int8_t** values, int* length) {
return GetValuesFromPayload<int8_t, arrow::Int8Array>(payloadReader, values, length);
}
extern "C" CStatus
GetInt16FromPayload(CPayloadReader payloadReader, int16_t** values, int* length) {
return GetValuesFromPayload<int16_t, arrow::Int16Array>(payloadReader, values, length);
}
extern "C" CStatus
GetInt32FromPayload(CPayloadReader payloadReader, int32_t** values, int* length) {
return GetValuesFromPayload<int32_t, arrow::Int32Array>(payloadReader, values, length);
}
extern "C" CStatus
GetInt64FromPayload(CPayloadReader payloadReader, int64_t** values, int* length) {
return GetValuesFromPayload<int64_t, arrow::Int64Array>(payloadReader, values, length);
}
extern "C" CStatus
GetFloatFromPayload(CPayloadReader payloadReader, float** values, int* length) {
return GetValuesFromPayload<float, arrow::FloatArray>(payloadReader, values, length);
}
extern "C" CStatus
GetDoubleFromPayload(CPayloadReader payloadReader, double** values, int* length) {
return GetValuesFromPayload<double, arrow::DoubleArray>(payloadReader, values, length);
}
extern "C" CStatus
GetOneStringFromPayload(CPayloadReader payloadReader, int idx, char** cstr, int* str_size) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
auto p = reinterpret_cast<wrapper::PayloadReader*>(payloadReader);
auto array = std::dynamic_pointer_cast<arrow::StringArray>(p->array);
if (array == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("Incorrect data type");
return st;
}
if (idx >= array->length()) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("memory overflow");
return st;
}
arrow::StringArray::offset_type length;
*cstr = (char*)array->GetValue(idx, &length);
*str_size = length;
return st;
}
extern "C" CStatus
GetBinaryVectorFromPayload(CPayloadReader payloadReader, uint8_t** values, int* dimension, int* length) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
auto p = reinterpret_cast<wrapper::PayloadReader*>(payloadReader);
auto array = std::dynamic_pointer_cast<arrow::FixedSizeBinaryArray>(p->array);
if (array == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("Incorrect data type");
return st;
}
*dimension = array->byte_width() * 8;
*length = array->length();
*values = (uint8_t*)array->raw_values();
return st;
}
extern "C" CStatus
GetFloatVectorFromPayload(CPayloadReader payloadReader, float** values, int* dimension, int* length) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
auto p = reinterpret_cast<wrapper::PayloadReader*>(payloadReader);
auto array = std::dynamic_pointer_cast<arrow::FixedSizeBinaryArray>(p->array);
if (array == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("Incorrect data type");
return st;
}
*dimension = array->byte_width() / sizeof(float);
*length = array->length();
*values = (float*)array->raw_values();
return st;
}
extern "C" int
GetPayloadLengthFromReader(CPayloadReader payloadReader) {
auto p = reinterpret_cast<wrapper::PayloadReader*>(payloadReader);
if (p->array == nullptr)
return 0;
return p->array->length();
}
extern "C" void
ReleasePayloadReader(CPayloadReader payloadReader) {
auto p = reinterpret_cast<wrapper::PayloadReader*>(payloadReader);
if (p != nullptr) {
delete[] p->bValues;
delete p;
}
arrow::default_memory_pool()->ReleaseUnused();
}

View File

@ -0,0 +1,102 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
typedef struct CBuffer {
char* data;
int length;
} CBuffer;
typedef struct CStatus {
int error_code;
const char* error_msg;
} CStatus;
//============= payload writer ======================
typedef void* CPayloadWriter;
CPayloadWriter
NewPayloadWriter(int columnType);
CStatus
AddBooleanToPayload(CPayloadWriter payloadWriter, bool* values, int length);
CStatus
AddInt8ToPayload(CPayloadWriter payloadWriter, int8_t* values, int length);
CStatus
AddInt16ToPayload(CPayloadWriter payloadWriter, int16_t* values, int length);
CStatus
AddInt32ToPayload(CPayloadWriter payloadWriter, int32_t* values, int length);
CStatus
AddInt64ToPayload(CPayloadWriter payloadWriter, int64_t* values, int length);
CStatus
AddFloatToPayload(CPayloadWriter payloadWriter, float* values, int length);
CStatus
AddDoubleToPayload(CPayloadWriter payloadWriter, double* values, int length);
CStatus
AddOneStringToPayload(CPayloadWriter payloadWriter, char* cstr, int str_size);
CStatus
AddBinaryVectorToPayload(CPayloadWriter payloadWriter, uint8_t* values, int dimension, int length);
CStatus
AddFloatVectorToPayload(CPayloadWriter payloadWriter, float* values, int dimension, int length);
CStatus
FinishPayloadWriter(CPayloadWriter payloadWriter);
CBuffer
GetPayloadBufferFromWriter(CPayloadWriter payloadWriter);
int
GetPayloadLengthFromWriter(CPayloadWriter payloadWriter);
void
ReleasePayloadWriter(CPayloadWriter handler);
//============= payload reader ======================
typedef void* CPayloadReader;
CPayloadReader
NewPayloadReader(int columnType, uint8_t* buffer, int64_t buf_size);
CStatus
GetBoolFromPayload(CPayloadReader payloadReader, bool** values, int* length);
CStatus
GetInt8FromPayload(CPayloadReader payloadReader, int8_t** values, int* length);
CStatus
GetInt16FromPayload(CPayloadReader payloadReader, int16_t** values, int* length);
CStatus
GetInt32FromPayload(CPayloadReader payloadReader, int32_t** values, int* length);
CStatus
GetInt64FromPayload(CPayloadReader payloadReader, int64_t** values, int* length);
CStatus
GetFloatFromPayload(CPayloadReader payloadReader, float** values, int* length);
CStatus
GetDoubleFromPayload(CPayloadReader payloadReader, double** values, int* length);
CStatus
GetOneStringFromPayload(CPayloadReader payloadReader, int idx, char** cstr, int* str_size);
CStatus
GetBinaryVectorFromPayload(CPayloadReader payloadReader, uint8_t** values, int* dimension, int* length);
CStatus
GetFloatVectorFromPayload(CPayloadReader payloadReader, float** values, int* dimension, int* length);
int
GetPayloadLengthFromReader(CPayloadReader payloadReader);
void
ReleasePayloadReader(CPayloadReader payloadReader);
#ifdef __cplusplus
}
#endif

View File

@ -57,11 +57,12 @@ add_subdirectory( yaml-cpp )
# ****************************** Thirdparty opentracing ***************************************
if ( MILVUS_WITH_OPENTRACING )
add_subdirectory( opentracing )
endif()
endif()
add_subdirectory( protobuf )
add_subdirectory( boost_ext )
add_subdirectory( arrow )
add_subdirectory( rocksdb )
# ******************************* Thridparty marisa ********************************
# TODO: support win.

View File

@ -52,7 +52,7 @@ macro( build_arrow )
"-DCMAKE_INSTALL_PREFIX=${CMAKE_INSTALL_PREFIX}"
"-DCMAKE_INCLUDE_PATH=${Boost_INCLUDE_DIRS}"
)
if (WIN32)
if ( WIN32 )
set( ARROW_CMAKE_ARGS ${ARROW_CMAKE_ARGS} "-DARROW_JEMALLOC=OFF" )
else ()
set( ARROW_CMAKE_ARGS ${ARROW_CMAKE_ARGS} "-DARROW_JEMALLOC=ON" )

View File

@ -14,47 +14,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
cmake_minimum_required( VERSION 3.18 )
project(wrapper)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
include( ExternalProject )
if ( APPLE )
set( ROCKSDB_VERSION "6.27.3" )
set( ROCKSDB_SOURCE_URL
"https://github.com/facebook/rocksdb/archive/v${ROCKSDB_VERSION}.tar.gz")
else ()
set( ROCKSDB_VERSION "6.15.2" )
set( ROCKSDB_SOURCE_URL
"https://github.com/facebook/rocksdb/archive/v${ROCKSDB_VERSION}.tar.gz")
endif ()
if( CUSTOM_THIRDPARTY_DOWNLOAD_PATH )
set( THIRDPARTY_DOWNLOAD_PATH ${CUSTOM_THIRDPARTY_DOWNLOAD_PATH} )
else()
set( THIRDPARTY_DOWNLOAD_PATH ${CMAKE_BINARY_DIR}/3rdparty_download/download )
endif()
message( STATUS "Thirdparty downloaded file path: ${THIRDPARTY_DOWNLOAD_PATH}" )
#-----------------------Using ccache if possible------------
find_program(CCACHE_FOUND ccache)
if (CCACHE_FOUND)
message(STATUS "Using ccache: ${CCACHE_FOUND}")
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ${CCACHE_FOUND})
set_property(GLOBAL PROPERTY RULE_LAUNCH_LINK ${CCACHE_FOUND})
set(ENV{CCACHE_COMMENTS} "1")
endif (CCACHE_FOUND)
# if ( APPLE )
set( ROCKSDB_VERSION "6.27.3" )
set( ROCKSDB_SOURCE_URL
"https://github.com/facebook/rocksdb/archive/v${ROCKSDB_VERSION}.tar.gz")
# else ()
# set( ROCKSDB_VERSION "6.15.2" )
# set( ROCKSDB_SOURCE_URL
# "https://github.com/facebook/rocksdb/archive/v${ROCKSDB_VERSION}.tar.gz")
# endif ()
macro( build_rocksdb )
message( STATUS "Building ROCKSDB-${ROCKSDB_VERSION} from source" )
if ( APPLE )
set ( ROCKSDB_URL_MD5 "e4a0625f0cec82060e62c81b787a1124" )
else ()
set ( ROCKSDB_URL_MD5 "67f9e04fda62af551dd039c37b1808ac" )
endif ()
# if ( APPLE )
set ( ROCKSDB_MD5 "e4a0625f0cec82060e62c81b787a1124" )
# else ()
# set ( ROCKSDB_MD5 "67f9e04fda62af551dd039c37b1808ac" )
# endif ()
if ( EMBEDDED_MILVUS )
message ( STATUS "Turning on fPIC while building embedded Milvus" )
@ -63,7 +40,12 @@ macro( build_rocksdb )
set( ROCKSDB_CMAKE_ARGS
"-DWITH_GFLAGS=OFF"
"-DROCKSDB_BUILD_SHARED=OFF"
"-DWITH_TESTS=OFF"
"-DWITH_BENCHMARK_TOOLS=OFF"
"-DWITH_CORE_TOOLS=OFF"
"-DWITH_TOOLS=OFF"
"-DCMAKE_INSTALL_PREFIX=${CMAKE_INSTALL_PREFIX}"
"-DROCKSDB_INSTALL_ON_WINDOWS=ON"
"-DFAIL_ON_WARNINGS=OFF"
${FPIC_ARG}
#This is used to solve 'illegal instruction' problem in some machine
@ -77,9 +59,8 @@ macro( build_rocksdb )
DOWNLOAD_DIR ${THIRDPARTY_DOWNLOAD_PATH}
INSTALL_DIR ${CMAKE_INSTALL_PREFIX}
URL ${ROCKSDB_SOURCE_URL}
URL_MD5 ${ROCKSDB_URL_MD5}
URL_MD5 ${ROCKSDB_MD5}
CMAKE_ARGS ${ROCKSDB_CMAKE_ARGS}
INSTALL_COMMAND make install
)
ExternalProject_Get_Property( rocksdb-ep INSTALL_DIR )
@ -89,6 +70,14 @@ macro( build_rocksdb )
file( MAKE_DIRECTORY "${INSTALL_DIR}/include" )
endif()
milvus_add_pkg_config("rocksdb")
add_library( rocksdb STATIC IMPORTED )
set_target_properties( rocksdb
PROPERTIES
IMPORTED_GLOBAL TRUE
IMPORTED_LOCATION ${INSTALL_DIR}/${CMAKE_INSTALL_LIBDIR}/librocksdb.a
INTERFACE_INCLUDE_DIRECTORIES ${INSTALL_DIR}/include )
endmacro()
build_rocksdb()

View File

@ -0,0 +1,10 @@
libdir=@CMAKE_INSTALL_FULL_LIBDIR@
includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@
Name: Rocksdb
Description: Rocksdb
Version: @ROCKSDB_VERSION@
Libs: -L${libdir} -lrocksdb
Libs.private: -lz -lbz2
Cflags: -I${includedir}

View File

@ -29,6 +29,7 @@ set(MILVUS_TEST_FILES
test_index_c_api.cpp
test_index_wrapper.cpp
test_init.cpp
test_parquet_c.cpp
test_plan_proto.cpp
test_query.cpp
test_reduce.cpp
@ -87,6 +88,7 @@ target_link_libraries(all_tests
milvus_indexbuilder
milvus_index
milvus_log
milvus_storage
pthread
)

View File

@ -0,0 +1,316 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <fstream>
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include "storage/parquet_c.h"
#include "storage/ColumnType.h"
#include "storage/PayloadStream.h"
static void
WriteToFile(CBuffer cb) {
auto data_file = std::ofstream("/tmp/wrapper_test_data.dat", std::ios::binary);
data_file.write(cb.data, cb.length);
data_file.close();
}
static std::shared_ptr<arrow::Table>
ReadFromFile() {
std::shared_ptr<arrow::io::ReadableFile> infile;
auto rst = arrow::io::ReadableFile::Open("/tmp/wrapper_test_data.dat");
if (!rst.ok())
return nullptr;
infile = *rst;
std::shared_ptr<arrow::Table> table;
std::unique_ptr<parquet::arrow::FileReader> reader;
auto st = parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader);
if (!st.ok())
return nullptr;
st = reader->ReadTable(&table);
if (!st.ok())
return nullptr;
return table;
}
TEST(wrapper, inoutstream) {
arrow::Int64Builder i64builder;
arrow::Status st;
st = i64builder.AppendValues({1, 2, 3, 4, 5});
ASSERT_TRUE(st.ok());
std::shared_ptr<arrow::Array> i64array;
st = i64builder.Finish(&i64array);
ASSERT_TRUE(st.ok());
auto schema = arrow::schema({arrow::field("val", arrow::int64())});
ASSERT_NE(schema, nullptr);
auto table = arrow::Table::Make(schema, {i64array});
ASSERT_NE(table, nullptr);
auto os = std::make_shared<wrapper::PayloadOutputStream>();
st = parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), os, 1024);
ASSERT_TRUE(st.ok());
const uint8_t* buf = os->Buffer().data();
int64_t buf_size = os->Buffer().size();
auto is = std::make_shared<wrapper::PayloadInputStream>(buf, buf_size);
std::shared_ptr<arrow::Table> intable;
std::unique_ptr<parquet::arrow::FileReader> reader;
st = parquet::arrow::OpenFile(is, arrow::default_memory_pool(), &reader);
ASSERT_TRUE(st.ok());
st = reader->ReadTable(&intable);
ASSERT_TRUE(st.ok());
auto chunks = intable->column(0)->chunks();
ASSERT_EQ(chunks.size(), 1);
auto inarray = std::dynamic_pointer_cast<arrow::Int64Array>(chunks[0]);
ASSERT_NE(inarray, nullptr);
ASSERT_EQ(inarray->Value(0), 1);
ASSERT_EQ(inarray->Value(1), 2);
ASSERT_EQ(inarray->Value(2), 3);
ASSERT_EQ(inarray->Value(3), 4);
ASSERT_EQ(inarray->Value(4), 5);
}
TEST(wrapper, boolean) {
auto payload = NewPayloadWriter(ColumnType::BOOL);
bool data[] = {true, false, true, false};
auto st = AddBooleanToPayload(payload, data, 4);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
st = FinishPayloadWriter(payload);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
auto cb = GetPayloadBufferFromWriter(payload);
ASSERT_GT(cb.length, 0);
ASSERT_NE(cb.data, nullptr);
auto nums = GetPayloadLengthFromWriter(payload);
ASSERT_EQ(nums, 4);
auto reader = NewPayloadReader(ColumnType::BOOL, (uint8_t*)cb.data, cb.length);
bool* values;
int length;
st = GetBoolFromPayload(reader, &values, &length);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
ASSERT_NE(values, nullptr);
ASSERT_EQ(length, 4);
length = GetPayloadLengthFromReader(reader);
ASSERT_EQ(length, 4);
for (int i = 0; i < length; i++) {
ASSERT_EQ(data[i], values[i]);
}
ReleasePayloadWriter(payload);
ReleasePayloadReader(reader);
}
#define NUMERIC_TEST(TEST_NAME, COLUMN_TYPE, DATA_TYPE, ADD_FUNC, GET_FUNC, ARRAY_TYPE) \
TEST(wrapper, TEST_NAME) { \
auto payload = NewPayloadWriter(COLUMN_TYPE); \
DATA_TYPE data[] = {-1, 1, -100, 100}; \
\
auto st = ADD_FUNC(payload, data, 4); \
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS); \
st = FinishPayloadWriter(payload); \
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS); \
auto cb = GetPayloadBufferFromWriter(payload); \
ASSERT_GT(cb.length, 0); \
ASSERT_NE(cb.data, nullptr); \
auto nums = GetPayloadLengthFromWriter(payload); \
ASSERT_EQ(nums, 4); \
\
auto reader = NewPayloadReader(COLUMN_TYPE, (uint8_t*)cb.data, cb.length); \
DATA_TYPE* values; \
int length; \
st = GET_FUNC(reader, &values, &length); \
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS); \
ASSERT_NE(values, nullptr); \
ASSERT_EQ(length, 4); \
length = GetPayloadLengthFromReader(reader); \
ASSERT_EQ(length, 4); \
\
for (int i = 0; i < length; i++) { \
ASSERT_EQ(data[i], values[i]); \
} \
\
ReleasePayloadWriter(payload); \
ReleasePayloadReader(reader); \
}
NUMERIC_TEST(int8, ColumnType::INT8, int8_t, AddInt8ToPayload, GetInt8FromPayload, arrow::Int8Array)
NUMERIC_TEST(int16, ColumnType::INT16, int16_t, AddInt16ToPayload, GetInt16FromPayload, arrow::Int16Array)
NUMERIC_TEST(int32, ColumnType::INT32, int32_t, AddInt32ToPayload, GetInt32FromPayload, arrow::Int32Array)
NUMERIC_TEST(int64, ColumnType::INT64, int64_t, AddInt64ToPayload, GetInt64FromPayload, arrow::Int64Array)
NUMERIC_TEST(float32, ColumnType::FLOAT, float, AddFloatToPayload, GetFloatFromPayload, arrow::FloatArray)
NUMERIC_TEST(float64, ColumnType::DOUBLE, double, AddDoubleToPayload, GetDoubleFromPayload, arrow::DoubleArray)
TEST(wrapper, stringarray) {
auto payload = NewPayloadWriter(ColumnType::STRING);
auto st = AddOneStringToPayload(payload, (char*)"1234", 4);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
st = AddOneStringToPayload(payload, (char*)"12345", 5);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
char v[3] = {0};
v[1] = 'a';
st = AddOneStringToPayload(payload, v, 3);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
st = FinishPayloadWriter(payload);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
auto cb = GetPayloadBufferFromWriter(payload);
ASSERT_GT(cb.length, 0);
ASSERT_NE(cb.data, nullptr);
auto nums = GetPayloadLengthFromWriter(payload);
ASSERT_EQ(nums, 3);
auto reader = NewPayloadReader(ColumnType::STRING, (uint8_t*)cb.data, cb.length);
int length = GetPayloadLengthFromReader(reader);
ASSERT_EQ(length, 3);
char *v0, *v1, *v2;
int s0, s1, s2;
st = GetOneStringFromPayload(reader, 0, &v0, &s0);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
ASSERT_EQ(s0, 4);
ASSERT_EQ(v0[0], '1');
ASSERT_EQ(v0[1], '2');
ASSERT_EQ(v0[2], '3');
ASSERT_EQ(v0[3], '4');
st = GetOneStringFromPayload(reader, 1, &v1, &s1);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
ASSERT_EQ(s1, 5);
ASSERT_EQ(v1[0], '1');
ASSERT_EQ(v1[1], '2');
ASSERT_EQ(v1[2], '3');
ASSERT_EQ(v1[3], '4');
ASSERT_EQ(v1[4], '5');
st = GetOneStringFromPayload(reader, 2, &v2, &s2);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
ASSERT_EQ(s2, 3);
ASSERT_EQ(v2[0], 0);
ASSERT_EQ(v2[1], 'a');
ASSERT_EQ(v2[2], 0);
ReleasePayloadWriter(payload);
ReleasePayloadReader(reader);
}
TEST(wrapper, binary_vector) {
auto payload = NewPayloadWriter(ColumnType::VECTOR_BINARY);
uint8_t data[] = {0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8};
auto st = AddBinaryVectorToPayload(payload, data, 16, 4);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
st = FinishPayloadWriter(payload);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
auto cb = GetPayloadBufferFromWriter(payload);
ASSERT_GT(cb.length, 0);
ASSERT_NE(cb.data, nullptr);
auto nums = GetPayloadLengthFromWriter(payload);
ASSERT_EQ(nums, 4);
auto reader = NewPayloadReader(ColumnType::VECTOR_BINARY, (uint8_t*)cb.data, cb.length);
uint8_t* values;
int length;
int dim;
st = GetBinaryVectorFromPayload(reader, &values, &dim, &length);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
ASSERT_NE(values, nullptr);
ASSERT_EQ(dim, 16);
ASSERT_EQ(length, 4);
length = GetPayloadLengthFromReader(reader);
ASSERT_EQ(length, 4);
for (int i = 0; i < 8; i++) {
ASSERT_EQ(values[i], data[i]);
}
ReleasePayloadWriter(payload);
ReleasePayloadReader(reader);
}
TEST(wrapper, float_vector) {
auto payload = NewPayloadWriter(ColumnType::VECTOR_FLOAT);
float data[] = {1, 2, 3, 4, 5, 6, 7, 8};
auto st = AddFloatVectorToPayload(payload, data, 2, 4);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
st = FinishPayloadWriter(payload);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
auto cb = GetPayloadBufferFromWriter(payload);
ASSERT_GT(cb.length, 0);
ASSERT_NE(cb.data, nullptr);
auto nums = GetPayloadLengthFromWriter(payload);
ASSERT_EQ(nums, 4);
auto reader = NewPayloadReader(ColumnType::VECTOR_FLOAT, (uint8_t*)cb.data, cb.length);
float* values;
int length;
int dim;
st = GetFloatVectorFromPayload(reader, &values, &dim, &length);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
ASSERT_NE(values, nullptr);
ASSERT_EQ(dim, 2);
ASSERT_EQ(length, 4);
length = GetPayloadLengthFromReader(reader);
ASSERT_EQ(length, 4);
for (int i = 0; i < 8; i++) {
ASSERT_EQ(values[i], data[i]);
}
ReleasePayloadWriter(payload);
ReleasePayloadReader(reader);
}
TEST(wrapper, int8_2) {
auto payload = NewPayloadWriter(ColumnType::INT8);
int8_t data[] = {-1, 1, -100, 100};
auto st = AddInt8ToPayload(payload, data, 4);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
st = FinishPayloadWriter(payload);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
auto cb = GetPayloadBufferFromWriter(payload);
ASSERT_GT(cb.length, 0);
ASSERT_NE(cb.data, nullptr);
WriteToFile(cb);
auto nums = GetPayloadLengthFromWriter(payload);
ASSERT_EQ(nums, 4);
ReleasePayloadWriter(payload);
auto table = ReadFromFile();
ASSERT_NE(table, nullptr);
auto chunks = table->column(0)->chunks();
ASSERT_EQ(chunks.size(), 1);
auto bool_array = std::dynamic_pointer_cast<arrow::Int8Array>(chunks[0]);
ASSERT_NE(bool_array, nullptr);
ASSERT_EQ(bool_array->Value(0), -1);
ASSERT_EQ(bool_array->Value(1), 1);
ASSERT_EQ(bool_array->Value(2), -100);
ASSERT_EQ(bool_array->Value(3), 100);
}

View File

@ -17,17 +17,11 @@
package indexnode
/*
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
#cgo darwin LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_indexbuilder -Wl,-rpath,"${SRCDIR}/../core/output/lib"
#cgo linux LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_indexbuilder -Wl,-rpath=${SRCDIR}/../core/output/lib
#cgo windows LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_indexbuilder -Wl,-rpath=${SRCDIR}/../core/output/lib
#cgo pkg-config: milvus_indexbuilder
#include <stdlib.h>
#include <stdint.h>
#include "indexbuilder/init_c.h"
*/
import "C"
import (

View File

@ -1,91 +0,0 @@
#!/bin/bash
# Licensed to the LF AI & Data foundation under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
SOURCE=${BASH_SOURCE[0]}
while [ -h $SOURCE ]; do # resolve $SOURCE until the file is no longer a symlink
DIR=$( cd -P $( dirname $SOURCE ) && pwd )
SOURCE=$(readlink $SOURCE)
[[ $SOURCE != /* ]] && SOURCE=$DIR/$SOURCE # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
done
DIR=$( cd -P $( dirname $SOURCE ) && pwd )
# echo $DIR
CGO_CFLAGS="-I$(pwd)/output/include"
if [ -f "$(pwd)/output/lib/librocksdb.a" ];then
CGO_LDFLAGS="-L$(pwd)/output/lib -l:librocksdb.a -lstdc++ -lm -lz"
elif [ -f "$(pwd)/output/lib64/librocksdb.a" ];then
CGO_LDFLAGS="-L$(pwd)/output/lib64 -l:librocksdb.a -lstdc++ -lm -lz"
else
echo "No found 'librocksdb.a'."
exit 1
fi
OUTPUT_LIB=${DIR}/output
if [ -d ${OUTPUT_LIB} ];then
rm -rf ${OUTPUT_LIB}
fi
mkdir ${OUTPUT_LIB}
BUILD_TYPE="Debug"
while getopts "t:h:" arg; do
case $arg in
t)
BUILD_TYPE=$OPTARG # BUILD_TYPE
;;
h) # help
echo "-t: build type(default: Debug)
-h: help
"
exit 0
;;
?)
echo "ERROR! unknown argument"
exit 1
;;
esac
done
echo "BUILD_TYPE: " $BUILD_TYPE
unameOut="$(uname -s)"
if [[ ! ${jobs+1} ]]; then
case "${unameOut}" in
Linux*) jobs=$(nproc);;
Darwin*)
llvm_prefix="$(brew --prefix llvm)"
export CLANG_TOOLS_PATH="${llvm_prefix}/bin"
export CC="${llvm_prefix}/bin/clang"
export CXX="${llvm_prefix}/bin/clang++"
export LDFLAGS="-L${llvm_prefix}/lib -L/usr/local/opt/libomp/lib"
export CXXFLAGS="-I${llvm_prefix}/include -I/usr/local/include -I/usr/local/opt/libomp/include"
jobs=$(sysctl -n hw.physicalcpu);;
*)
echo "Exit 0, System:${unameOut}";
exit 0;
esac
fi
pushd ${OUTPUT_LIB}
CMAKE_CMD="cmake \
-DCMAKE_BUILD_TYPE=${BUILD_TYPE} .."
${CMAKE_CMD}
echo ${CMAKE_CMD}
make -j ${jobs} VERBOSE=0

View File

@ -17,9 +17,7 @@
package querynode
/*
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
#cgo darwin LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath,"${SRCDIR}/../core/output/lib"
#cgo linux LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#cgo pkg-config: milvus_segcore
#include "segcore/collection_c.h"
#include "common/type_c.h"

View File

@ -17,16 +17,10 @@
package querynode
/*
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
#cgo darwin LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath,"${SRCDIR}/../core/output/lib"
#cgo linux LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#cgo windows LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#cgo pkg-config: milvus_segcore
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
*/
import "C"
import (

View File

@ -17,14 +17,10 @@
package querynode
/*
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
#cgo darwin LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_common -lmilvus_segcore -Wl,-rpath,"${SRCDIR}/../core/output/lib"
#cgo linux LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_common -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#cgo windows LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_common -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#cgo pkg-config: milvus_common milvus_segcore
#include "segcore/load_index_c.h"
#include "common/vector_index_c.h"
*/
import "C"

View File

@ -17,15 +17,10 @@
package querynode
/*
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
#cgo darwin LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath,"${SRCDIR}/../core/output/lib"
#cgo linux LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#cgo pkg-config: milvus_segcore
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
*/
import "C"
import (

View File

@ -17,11 +17,7 @@
package querynode
/*
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
#cgo darwin LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath,"${SRCDIR}/../core/output/lib"
#cgo linux LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#cgo pkg-config: milvus_segcore
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"

View File

@ -17,10 +17,7 @@
package querynode
/*
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
#cgo darwin LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath,"${SRCDIR}/../core/output/lib"
#cgo linux LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#cgo windows LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#cgo pkg-config: milvus_segcore
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"

View File

@ -17,12 +17,7 @@
package querynode
/*
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
#cgo darwin LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath,"${SRCDIR}/../core/output/lib"
#cgo linux LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#cgo windows LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#cgo pkg-config: milvus_segcore
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"

View File

@ -17,13 +17,10 @@
package querynode
/*
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
#cgo darwin LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath,"${SRCDIR}/../core/output/lib"
#cgo linux LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#cgo pkg-config: milvus_segcore
#include "segcore/plan_c.h"
#include "segcore/reduce_c.h"
*/
import "C"
import (

View File

@ -17,11 +17,7 @@
package querynode
/*
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
#cgo darwin LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath,"${SRCDIR}/../core/output/lib"
#cgo linux LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#cgo windows LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#cgo pkg-config: milvus_segcore
#include "segcore/collection_c.h"
#include "segcore/plan_c.h"

View File

@ -1,4 +0,0 @@
output
cmake-build-debug
.idea
cmake_build

View File

@ -1,55 +0,0 @@
# Licensed to the LF AI & Data foundation under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cmake_minimum_required( VERSION 3.18 )
project(wrapper)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
include( GNUInstallDirs )
include( ExternalProject )
include( CheckCXXCompilerFlag )
set(ARROW_OUTPUT_PREFIX ${CMAKE_INSTALL_PREFIX}/../../../core/output)
add_library(wrapper STATIC)
target_sources(wrapper PUBLIC ParquetWrapper.cpp PayloadStream.cpp)
if ( NOT MSYS )
add_library(parquet STATIC IMPORTED)
set_target_properties(parquet PROPERTIES
IMPORTED_LOCATION "${ARROW_OUTPUT_PREFIX}/lib/libparquet.a"
INTERFACE_INCLUDE_DIRECTORIES "${ARROW_OUTPUT_PREFIX}/include")
endif()
if ( EMBEDDED_MILVUS )
message ( STATUS "Turning on fPIC while building embedded Milvus" )
set_target_properties( wrapper PROPERTIES INTERFACE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_SOURCE_DIR} POSITION_INDEPENDENT_CODE ON )
else()
set_target_properties( wrapper PROPERTIES INTERFACE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_SOURCE_DIR} )
endif()
target_link_libraries( wrapper PUBLIC parquet pthread)
if(NOT CMAKE_INSTALL_PREFIX)
set(CMAKE_INSTALL_PREFIX ${CMAKE_CURRENT_BINARY_DIR})
endif()
install( TARGETS wrapper )
if (BUILD_TESTING)
add_subdirectory(test)
endif()

View File

@ -1,59 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
enum ColumnType : int {
NONE = 0,
BOOL = 1,
INT8 = 2,
INT16 = 3,
INT32 = 4,
INT64 = 5,
FLOAT = 10,
DOUBLE = 11,
STRING = 20,
VARCHAR = 21,
VECTOR_BINARY = 100,
VECTOR_FLOAT = 101
};
enum ErrorCode : int {
SUCCESS = 0,
UNEXPECTED_ERROR = 1,
CONNECT_FAILED = 2,
PERMISSION_DENIED = 3,
COLLECTION_NOT_EXISTS = 4,
ILLEGAL_ARGUMENT = 5,
ILLEGAL_DIMENSION = 7,
ILLEGAL_INDEX_TYPE = 8,
ILLEGAL_COLLECTION_NAME = 9,
ILLEGAL_TOPK = 10,
ILLEGAL_ROWRECORD = 11,
ILLEGAL_VECTOR_ID = 12,
ILLEGAL_SEARCH_RESULT = 13,
FILE_NOT_FOUND = 14,
META_FAILED = 15,
CACHE_FAILED = 16,
CANNOT_CREATE_FOLDER = 17,
CANNOT_CREATE_FILE = 18,
CANNOT_DELETE_FOLDER = 19,
CANNOT_DELETE_FILE = 20,
BUILD_INDEX_ERROR = 21,
ILLEGAL_NLIST = 22,
ILLEGAL_METRIC_TYPE = 23,
OUT_OF_MEMORY = 24,
DD_REQUEST_RACE = 1000
};

View File

@ -1,546 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ParquetWrapper.h"
#include "PayloadStream.h"
static const char *ErrorMsg(const std::string &msg) {
if (msg.empty()) return nullptr;
auto ret = (char *) malloc(msg.size() + 1);
std::memcpy(ret, msg.c_str(), msg.size());
ret[msg.size()] = '\0';
return ret;
}
extern "C"
CPayloadWriter NewPayloadWriter(int columnType) {
auto p = new wrapper::PayloadWriter;
p->builder = nullptr;
p->schema = nullptr;
p->output = nullptr;
p->dimension = wrapper::EMPTY_DIMENSION;
p->rows = 0;
switch (static_cast<ColumnType>(columnType)) {
case ColumnType::BOOL : {
p->columnType = ColumnType::BOOL;
p->builder = std::make_shared<arrow::BooleanBuilder>();
p->schema = arrow::schema({arrow::field("val", arrow::boolean())});
break;
}
case ColumnType::INT8 : {
p->columnType = ColumnType::INT8;
p->builder = std::make_shared<arrow::Int8Builder>();
p->schema = arrow::schema({arrow::field("val", arrow::int8())});
break;
}
case ColumnType::INT16 : {
p->columnType = ColumnType::INT16;
p->builder = std::make_shared<arrow::Int16Builder>();
p->schema = arrow::schema({arrow::field("val", arrow::int16())});
break;
}
case ColumnType::INT32 : {
p->columnType = ColumnType::INT32;
p->builder = std::make_shared<arrow::Int32Builder>();
p->schema = arrow::schema({arrow::field("val", arrow::int32())});
break;
}
case ColumnType::INT64 : {
p->columnType = ColumnType::INT64;
p->builder = std::make_shared<arrow::Int64Builder>();
p->schema = arrow::schema({arrow::field("val", arrow::int64())});
break;
}
case ColumnType::FLOAT : {
p->columnType = ColumnType::FLOAT;
p->builder = std::make_shared<arrow::FloatBuilder>();
p->schema = arrow::schema({arrow::field("val", arrow::float32())});
break;
}
case ColumnType::DOUBLE : {
p->columnType = ColumnType::DOUBLE;
p->builder = std::make_shared<arrow::DoubleBuilder>();
p->schema = arrow::schema({arrow::field("val", arrow::float64())});
break;
}
case ColumnType::VARCHAR:
case ColumnType::STRING : {
p->columnType = ColumnType::STRING;
p->builder = std::make_shared<arrow::StringBuilder>();
p->schema = arrow::schema({arrow::field("val", arrow::utf8())});
break;
}
case ColumnType::VECTOR_BINARY : {
p->columnType = ColumnType::VECTOR_BINARY;
p->dimension = wrapper::EMPTY_DIMENSION;
break;
}
case ColumnType::VECTOR_FLOAT : {
p->columnType = ColumnType::VECTOR_FLOAT;
p->dimension = wrapper::EMPTY_DIMENSION;
break;
}
default: {
delete p;
return nullptr;
}
}
return reinterpret_cast<CPayloadWriter>(p);
}
template<typename DT, typename BT>
CStatus AddValuesToPayload(CPayloadWriter payloadWriter, DT *values, int length) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
if (length <= 0) return st;
auto p = reinterpret_cast<wrapper::PayloadWriter *>(payloadWriter);
auto builder = std::dynamic_pointer_cast<BT>(p->builder);
if (builder == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("incorrect data type");
return st;
}
if (p->output != nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("payload has finished");
return st;
}
auto ast = builder->AppendValues(values, values + length);
if (!ast.ok()) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg(ast.message());
return st;
}
p->rows += length;
return st;
}
extern "C"
CStatus AddBooleanToPayload(CPayloadWriter payloadWriter, bool *values, int length) {
return AddValuesToPayload<bool, arrow::BooleanBuilder>(payloadWriter, values, length);
}
extern "C"
CStatus AddInt8ToPayload(CPayloadWriter payloadWriter, int8_t *values, int length) {
return AddValuesToPayload<int8_t, arrow::Int8Builder>(payloadWriter, values, length);
}
extern "C"
CStatus AddInt16ToPayload(CPayloadWriter payloadWriter, int16_t *values, int length) {
return AddValuesToPayload<int16_t, arrow::Int16Builder>(payloadWriter, values, length);
}
extern "C"
CStatus AddInt32ToPayload(CPayloadWriter payloadWriter, int32_t *values, int length) {
return AddValuesToPayload<int32_t, arrow::Int32Builder>(payloadWriter, values, length);
}
extern "C"
CStatus AddInt64ToPayload(CPayloadWriter payloadWriter, int64_t *values, int length) {
return AddValuesToPayload<int64_t, arrow::Int64Builder>(payloadWriter, values, length);
}
extern "C"
CStatus AddFloatToPayload(CPayloadWriter payloadWriter, float *values, int length) {
return AddValuesToPayload<float, arrow::FloatBuilder>(payloadWriter, values, length);
}
extern "C"
CStatus AddDoubleToPayload(CPayloadWriter payloadWriter, double *values, int length) {
return AddValuesToPayload<double, arrow::DoubleBuilder>(payloadWriter, values, length);
}
extern "C"
CStatus AddOneStringToPayload(CPayloadWriter payloadWriter, char *cstr, int str_size) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
auto p = reinterpret_cast<wrapper::PayloadWriter *>(payloadWriter);
auto builder = std::dynamic_pointer_cast<arrow::StringBuilder>(p->builder);
if (builder == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("incorrect data type");
return st;
}
if (p->output != nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("payload has finished");
return st;
}
arrow::Status ast;
if (cstr == nullptr || str_size < 0) {
ast = builder->AppendNull();
} else {
ast = builder->Append(cstr, str_size);
}
if (!ast.ok()) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg(ast.message());
return st;
}
p->rows++;
return st;
}
extern "C"
CStatus AddBinaryVectorToPayload(CPayloadWriter payloadWriter, uint8_t *values, int dimension, int length) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
if (length <= 0) return st;
auto p = reinterpret_cast<wrapper::PayloadWriter *>(payloadWriter);
if (p->dimension == wrapper::EMPTY_DIMENSION) {
if ((dimension % 8) || (dimension <= 0)) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("incorrect dimension value");
return st;
}
if (p->builder != nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("incorrect data type");
return st;
}
p->builder = std::make_shared<arrow::FixedSizeBinaryBuilder>(arrow::fixed_size_binary(dimension / 8));
p->schema = arrow::schema({arrow::field("val", arrow::fixed_size_binary(dimension / 8))});
p->dimension = dimension;
} else if (p->dimension != dimension) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("dimension changed");
return st;
}
auto builder = std::dynamic_pointer_cast<arrow::FixedSizeBinaryBuilder>(p->builder);
if (builder == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("incorrect data type");
return st;
}
if (p->output != nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("payload has finished");
return st;
}
auto ast = builder->AppendValues(values, length);
if (!ast.ok()) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg(ast.message());
return st;
}
p->rows += length;
return st;
}
extern "C"
CStatus AddFloatVectorToPayload(CPayloadWriter payloadWriter, float *values, int dimension, int length) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
if (length <= 0) return st;
auto p = reinterpret_cast<wrapper::PayloadWriter *>(payloadWriter);
if (p->dimension == wrapper::EMPTY_DIMENSION) {
if (p->builder != nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("incorrect data type");
return st;
}
p->builder = std::make_shared<arrow::FixedSizeBinaryBuilder>(
arrow::fixed_size_binary(dimension * sizeof(float)));
p->schema = arrow::schema({arrow::field("val", arrow::fixed_size_binary(dimension * sizeof(float)))});
p->dimension = dimension;
} else if (p->dimension != dimension) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("dimension changed");
return st;
}
auto builder = std::dynamic_pointer_cast<arrow::FixedSizeBinaryBuilder>(p->builder);
if (builder == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("incorrect data type");
return st;
}
if (p->output != nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("payload has finished");
return st;
}
auto ast = builder->AppendValues(reinterpret_cast<const uint8_t *>(values), length);
if (!ast.ok()) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg(ast.message());
return st;
}
p->rows += length;
return st;
}
extern "C"
CStatus FinishPayloadWriter(CPayloadWriter payloadWriter) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
auto p = reinterpret_cast<wrapper::PayloadWriter *>(payloadWriter);
if (p->builder == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("arrow builder is nullptr");
return st;
}
if (p->output == nullptr) {
std::shared_ptr<arrow::Array> array;
auto ast = p->builder->Finish(&array);
if (!ast.ok()) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg(ast.message());
return st;
}
auto table = arrow::Table::Make(p->schema, {array});
p->output = std::make_shared<wrapper::PayloadOutputStream>();
auto mem_pool = arrow::default_memory_pool();
ast = parquet::arrow::WriteTable(*table, mem_pool, p->output, 1024 * 1024 * 1024,
parquet::WriterProperties::Builder().compression(arrow::Compression::ZSTD)
->compression_level(3)->build());
if (!ast.ok()) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg(ast.message());
return st;
}
}
return st;
}
CBuffer GetPayloadBufferFromWriter(CPayloadWriter payloadWriter) {
CBuffer buf;
auto p = reinterpret_cast<wrapper::PayloadWriter *>(payloadWriter);
if (p->output == nullptr) {
buf.length = 0;
buf.data = nullptr;
return buf;
}
auto &output = p->output->Buffer();
buf.length = static_cast<int>(output.size());
buf.data = (char *) (output.data());
return buf;
}
int GetPayloadLengthFromWriter(CPayloadWriter payloadWriter) {
auto p = reinterpret_cast<wrapper::PayloadWriter *>(payloadWriter);
return p->rows;
}
extern "C"
void ReleasePayloadWriter(CPayloadWriter handler) {
auto p = reinterpret_cast<wrapper::PayloadWriter *>(handler);
if (p != nullptr) delete p;
arrow::default_memory_pool()->ReleaseUnused();
}
extern "C"
CPayloadReader NewPayloadReader(int columnType, uint8_t *buffer, int64_t buf_size) {
auto p = new wrapper::PayloadReader;
p->bValues = nullptr;
p->input = std::make_shared<wrapper::PayloadInputStream>(buffer, buf_size);
auto mem_pool = arrow::default_memory_pool();
auto st = parquet::arrow::OpenFile(p->input, mem_pool, &p->reader);
if (!st.ok()) {
delete p;
return nullptr;
}
st = p->reader->ReadTable(&p->table);
if (!st.ok()) {
delete p;
return nullptr;
}
p->column = p->table->column(0);
assert(p->column != nullptr);
assert(p->column->chunks().size() == 1);
p->array = p->column->chunk(0);
switch (columnType) {
case ColumnType::BOOL :
case ColumnType::INT8 :
case ColumnType::INT16 :
case ColumnType::INT32 :
case ColumnType::INT64 :
case ColumnType::FLOAT :
case ColumnType::DOUBLE :
case ColumnType::STRING :
case ColumnType::VARCHAR:
case ColumnType::VECTOR_BINARY :
case ColumnType::VECTOR_FLOAT : {
break;
}
default: {
delete p;
return nullptr;
}
}
return reinterpret_cast<CPayloadReader>(p);
}
extern "C"
CStatus GetBoolFromPayload(CPayloadReader payloadReader, bool **values, int *length) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
auto p = reinterpret_cast<wrapper::PayloadReader *>(payloadReader);
if (p->bValues == nullptr) {
auto array = std::dynamic_pointer_cast<arrow::BooleanArray>(p->array);
if (array == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("incorrect data type");
return st;
}
int len = array->length();
p->bValues = new bool[len];
for (int i = 0; i < len; i++) {
p->bValues[i] = array->Value(i);
}
}
*values = p->bValues;
*length = p->array->length();
return st;
}
template<typename DT, typename AT>
CStatus GetValuesFromPayload(CPayloadReader payloadReader, DT **values, int *length) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
auto p = reinterpret_cast<wrapper::PayloadReader *>(payloadReader);
auto array = std::dynamic_pointer_cast<AT>(p->array);
if (array == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("incorrect data type");
return st;
}
*length = array->length();
*values = (DT *) array->raw_values();
return st;
}
extern "C"
CStatus GetInt8FromPayload(CPayloadReader payloadReader, int8_t **values, int *length) {
return GetValuesFromPayload<int8_t, arrow::Int8Array>(payloadReader, values, length);
}
extern "C"
CStatus GetInt16FromPayload(CPayloadReader payloadReader, int16_t **values, int *length) {
return GetValuesFromPayload<int16_t, arrow::Int16Array>(payloadReader, values, length);
}
extern "C"
CStatus GetInt32FromPayload(CPayloadReader payloadReader, int32_t **values, int *length) {
return GetValuesFromPayload<int32_t, arrow::Int32Array>(payloadReader, values, length);
}
extern "C"
CStatus GetInt64FromPayload(CPayloadReader payloadReader, int64_t **values, int *length) {
return GetValuesFromPayload<int64_t, arrow::Int64Array>(payloadReader, values, length);
}
extern "C"
CStatus GetFloatFromPayload(CPayloadReader payloadReader, float **values, int *length) {
return GetValuesFromPayload<float, arrow::FloatArray>(payloadReader, values, length);
}
extern "C"
CStatus GetDoubleFromPayload(CPayloadReader payloadReader, double **values, int *length) {
return GetValuesFromPayload<double, arrow::DoubleArray>(payloadReader, values, length);
}
extern "C"
CStatus GetOneStringFromPayload(CPayloadReader payloadReader, int idx, char **cstr, int *str_size) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
auto p = reinterpret_cast<wrapper::PayloadReader *>(payloadReader);
auto array = std::dynamic_pointer_cast<arrow::StringArray>(p->array);
if (array == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("Incorrect data type");
return st;
}
if (idx >= array->length()) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("memory overflow");
return st;
}
arrow::StringArray::offset_type length;
*cstr = (char *) array->GetValue(idx, &length);
*str_size = length;
return st;
}
extern "C"
CStatus GetBinaryVectorFromPayload(CPayloadReader payloadReader, uint8_t **values, int *dimension, int *length) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
auto p = reinterpret_cast<wrapper::PayloadReader *>(payloadReader);
auto array = std::dynamic_pointer_cast<arrow::FixedSizeBinaryArray>(p->array);
if (array == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("Incorrect data type");
return st;
}
*dimension = array->byte_width() * 8;
*length = array->length();
*values = (uint8_t *) array->raw_values();
return st;
}
extern "C"
CStatus GetFloatVectorFromPayload(CPayloadReader payloadReader, float **values, int *dimension, int *length) {
CStatus st;
st.error_code = static_cast<int>(ErrorCode::SUCCESS);
st.error_msg = nullptr;
auto p = reinterpret_cast<wrapper::PayloadReader *>(payloadReader);
auto array = std::dynamic_pointer_cast<arrow::FixedSizeBinaryArray>(p->array);
if (array == nullptr) {
st.error_code = static_cast<int>(ErrorCode::UNEXPECTED_ERROR);
st.error_msg = ErrorMsg("Incorrect data type");
return st;
}
*dimension = array->byte_width() / sizeof(float);
*length = array->length();
*values = (float *) array->raw_values();
return st;
}
extern "C"
int GetPayloadLengthFromReader(CPayloadReader payloadReader) {
auto p = reinterpret_cast<wrapper::PayloadReader *>(payloadReader);
if (p->array == nullptr) return 0;
return p->array->length();
}
extern "C"
void ReleasePayloadReader(CPayloadReader payloadReader) {
auto p = reinterpret_cast<wrapper::PayloadReader *>(payloadReader);
if (p != nullptr) {
delete[] p->bValues;
delete p;
}
arrow::default_memory_pool()->ReleaseUnused();
}

View File

@ -1,74 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
typedef struct CBuffer {
char *data;
int length;
} CBuffer;
typedef struct CStatus {
int error_code;
const char *error_msg;
} CStatus;
//============= payload writer ======================
typedef void *CPayloadWriter;
CPayloadWriter NewPayloadWriter(int columnType);
CStatus AddBooleanToPayload(CPayloadWriter payloadWriter, bool *values, int length);
CStatus AddInt8ToPayload(CPayloadWriter payloadWriter, int8_t *values, int length);
CStatus AddInt16ToPayload(CPayloadWriter payloadWriter, int16_t *values, int length);
CStatus AddInt32ToPayload(CPayloadWriter payloadWriter, int32_t *values, int length);
CStatus AddInt64ToPayload(CPayloadWriter payloadWriter, int64_t *values, int length);
CStatus AddFloatToPayload(CPayloadWriter payloadWriter, float *values, int length);
CStatus AddDoubleToPayload(CPayloadWriter payloadWriter, double *values, int length);
CStatus AddOneStringToPayload(CPayloadWriter payloadWriter, char *cstr, int str_size);
CStatus AddBinaryVectorToPayload(CPayloadWriter payloadWriter, uint8_t *values, int dimension, int length);
CStatus AddFloatVectorToPayload(CPayloadWriter payloadWriter, float *values, int dimension, int length);
CStatus FinishPayloadWriter(CPayloadWriter payloadWriter);
CBuffer GetPayloadBufferFromWriter(CPayloadWriter payloadWriter);
int GetPayloadLengthFromWriter(CPayloadWriter payloadWriter);
void ReleasePayloadWriter(CPayloadWriter handler);
//============= payload reader ======================
typedef void *CPayloadReader;
CPayloadReader NewPayloadReader(int columnType, uint8_t *buffer, int64_t buf_size);
CStatus GetBoolFromPayload(CPayloadReader payloadReader, bool **values, int *length);
CStatus GetInt8FromPayload(CPayloadReader payloadReader, int8_t **values, int *length);
CStatus GetInt16FromPayload(CPayloadReader payloadReader, int16_t **values, int *length);
CStatus GetInt32FromPayload(CPayloadReader payloadReader, int32_t **values, int *length);
CStatus GetInt64FromPayload(CPayloadReader payloadReader, int64_t **values, int *length);
CStatus GetFloatFromPayload(CPayloadReader payloadReader, float **values, int *length);
CStatus GetDoubleFromPayload(CPayloadReader payloadReader, double **values, int *length);
CStatus GetOneStringFromPayload(CPayloadReader payloadReader, int idx, char **cstr, int *str_size);
CStatus GetBinaryVectorFromPayload(CPayloadReader payloadReader, uint8_t **values, int *dimension, int *length);
CStatus GetFloatVectorFromPayload(CPayloadReader payloadReader, float **values, int *dimension, int *length);
int GetPayloadLengthFromReader(CPayloadReader payloadReader);
void ReleasePayloadReader(CPayloadReader payloadReader);
#ifdef __cplusplus
}
#endif

View File

@ -1,107 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "PayloadStream.h"
namespace wrapper {
PayloadOutputStream::PayloadOutputStream() {
buffer_.reserve(1024 * 1024);
closed_ = false;
}
PayloadOutputStream::~PayloadOutputStream() noexcept {
}
arrow::Status PayloadOutputStream::Close() {
closed_ = true;
return arrow::Status::OK();
}
arrow::Result<int64_t> PayloadOutputStream::Tell() const {
return arrow::Result<int64_t>(buffer_.size());
}
bool PayloadOutputStream::closed() const {
return closed_;
}
arrow::Status PayloadOutputStream::Write(const void *data, int64_t nbytes) {
if (nbytes <= 0) return arrow::Status::OK();
auto size = buffer_.size();
buffer_.resize(size + nbytes);
std::memcpy(buffer_.data() + size, data, nbytes);
return arrow::Status::OK();
}
arrow::Status PayloadOutputStream::Flush() {
return arrow::Status::OK();
}
const std::vector<uint8_t> &PayloadOutputStream::Buffer() const {
return buffer_;
}
PayloadInputStream::PayloadInputStream(const uint8_t *data, int64_t size) :
data_(data), size_(size), tell_(0), closed_(false) {
}
PayloadInputStream::~PayloadInputStream() noexcept {
}
arrow::Status PayloadInputStream::Close() {
closed_ = true;
return arrow::Status::OK();
}
bool PayloadInputStream::closed() const {
return closed_;
}
arrow::Result<int64_t> PayloadInputStream::Tell() const {
return arrow::Result<int64_t>(tell_);
}
arrow::Status PayloadInputStream::Seek(int64_t position) {
if (position < 0 || position >= size_) return arrow::Status::IOError("invalid position");
tell_ = position;
return arrow::Status::OK();
}
arrow::Result<int64_t> PayloadInputStream::Read(int64_t nbytes, void *out) {
auto remain = size_ - tell_;
if (nbytes > remain) nbytes = remain;
std::memcpy(out, data_ + tell_, nbytes);
tell_ += nbytes;
return arrow::Result<int64_t>(nbytes);
}
arrow::Result<std::shared_ptr<arrow::Buffer>> PayloadInputStream::Read(int64_t nbytes) {
auto remain = size_ - tell_;
if (nbytes > remain) nbytes = remain;
auto buf = std::make_shared<arrow::Buffer>(data_ + tell_, nbytes);
tell_ += nbytes;
return arrow::Result<std::shared_ptr<arrow::Buffer>>(buf);
}
arrow::Result<int64_t> PayloadInputStream::GetSize() {
return arrow::Result<int64_t>(size_);
}
}

View File

@ -1,91 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <arrow/api.h>
#include <arrow/io/interfaces.h>
#include <parquet/arrow/writer.h>
#include <parquet/arrow/reader.h>
#include "ColumnType.h"
namespace wrapper {
class PayloadOutputStream;
class PayloadInputStream;
constexpr int EMPTY_DIMENSION = -1;
struct PayloadWriter {
ColumnType columnType;
int dimension; // binary vector, float vector
std::shared_ptr<arrow::ArrayBuilder> builder;
std::shared_ptr<arrow::Schema> schema;
std::shared_ptr<PayloadOutputStream> output;
int rows;
};
struct PayloadReader {
ColumnType column_type;
std::shared_ptr<PayloadInputStream> input;
std::unique_ptr<parquet::arrow::FileReader> reader;
std::shared_ptr<arrow::Table> table;
std::shared_ptr<arrow::ChunkedArray> column;
std::shared_ptr<arrow::Array> array;
bool *bValues;
};
class PayloadOutputStream : public arrow::io::OutputStream {
public:
PayloadOutputStream();
~PayloadOutputStream();
arrow::Status Close() override;
arrow::Result<int64_t> Tell() const override;
bool closed() const override;
arrow::Status Write(const void *data, int64_t nbytes) override;
arrow::Status Flush() override;
public:
const std::vector<uint8_t> &Buffer() const;
private:
std::vector<uint8_t> buffer_;
bool closed_;
};
class PayloadInputStream : public arrow::io::RandomAccessFile {
public:
PayloadInputStream(const uint8_t *data, int64_t size);
~PayloadInputStream();
arrow::Status Close() override;
arrow::Result<int64_t> Tell() const override;
bool closed() const override;
arrow::Status Seek(int64_t position) override;
arrow::Result<int64_t> Read(int64_t nbytes, void *out) override;
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;
arrow::Result<int64_t> GetSize() override;
private:
const uint8_t *data_;
const int64_t size_;
int64_t tell_;
bool closed_;
};
}

View File

@ -1,82 +0,0 @@
# Licensed to the LF AI & Data foundation under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/bin/bash
SOURCE=${BASH_SOURCE[0]}
while [ -h $SOURCE ]; do # resolve $SOURCE until the file is no longer a symlink
DIR=$( cd -P $( dirname $SOURCE ) && pwd )
SOURCE=$(readlink $SOURCE)
[[ $SOURCE != /* ]] && SOURCE=$DIR/$SOURCE # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
done
DIR=$( cd -P $( dirname $SOURCE ) && pwd )
# echo $DIR
CMAKE_BUILD=${DIR}/cmake_build
OUTPUT_LIB=${DIR}/output
if [ ! -d ${CMAKE_BUILD} ];then
mkdir ${CMAKE_BUILD}
fi
BUILD_TYPE="Debug"
CUSTOM_THIRDPARTY_PATH=""
while getopts "a:b:t:h:f:" arg; do
case $arg in
f)
CUSTOM_THIRDPARTY_PATH=$OPTARG
;;
t)
BUILD_TYPE=$OPTARG # BUILD_TYPE
;;
a)
GIT_ARROW_REPO=$OPTARG
;;
b)
GIT_ARROW_TAG=$OPTARG
;;
h) # help
echo "-t: build type(default: Debug)
-a: arrow repo(default: https://github.com/apache/arrow.git)
-b: arrow tag(default: apache-arrow-2.0.0)
-f: custom thirdparty path(default:)
-h: help
"
exit 0
;;
?)
echo "ERROR! unknown argument"
exit 1
;;
esac
done
echo "BUILD_TYPE: " $BUILD_TYPE
echo "CUSTOM_THIRDPARTY_PATH: " $CUSTOM_THIRDPARTY_PATH
pushd ${CMAKE_BUILD}
CMAKE_CMD="cmake \
-DCMAKE_INSTALL_PREFIX=${OUTPUT_LIB} \
-DCMAKE_BUILD_TYPE=${BUILD_TYPE} \
-DCUSTOM_THIRDPARTY_DOWNLOAD_PATH=${CUSTOM_THIRDPARTY_PATH} .."
${CMAKE_CMD}
echo ${CMAKE_CMD}
if [[ ! ${jobs+1} ]]; then
jobs=$(nproc)
fi
make -j ${jobs} && make install

View File

@ -1,321 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <fstream>
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include "ParquetWrapper.h"
#include "ColumnType.h"
#include "PayloadStream.h"
static void WriteToFile(CBuffer cb) {
auto data_file = std::ofstream("/tmp/wrapper_test_data.dat", std::ios::binary);
data_file.write(cb.data, cb.length);
data_file.close();
}
static std::shared_ptr<arrow::Table> ReadFromFile() {
std::shared_ptr<arrow::io::ReadableFile> infile;
auto rst = arrow::io::ReadableFile::Open("/tmp/wrapper_test_data.dat");
if (!rst.ok()) return nullptr;
infile = *rst;
std::shared_ptr<arrow::Table> table;
std::unique_ptr<parquet::arrow::FileReader> reader;
auto st = parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader);
if (!st.ok()) return nullptr;
st = reader->ReadTable(&table);
if (!st.ok()) return nullptr;
return table;
}
TEST(wrapper, inoutstream) {
arrow::Int64Builder i64builder;
arrow::Status st;
st = i64builder.AppendValues({1, 2, 3, 4, 5});
ASSERT_TRUE(st.ok());
std::shared_ptr<arrow::Array> i64array;
st = i64builder.Finish(&i64array);
ASSERT_TRUE(st.ok());
auto schema = arrow::schema({arrow::field("val", arrow::int64())});
ASSERT_NE(schema, nullptr);
auto table = arrow::Table::Make(schema, {i64array});
ASSERT_NE(table, nullptr);
auto os = std::make_shared<wrapper::PayloadOutputStream>();
st = parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), os, 1024);
ASSERT_TRUE(st.ok());
const uint8_t *buf = os->Buffer().data();
int64_t buf_size = os->Buffer().size();
auto is = std::make_shared<wrapper::PayloadInputStream>(buf, buf_size);
std::shared_ptr<arrow::Table> intable;
std::unique_ptr<parquet::arrow::FileReader> reader;
st = parquet::arrow::OpenFile(is, arrow::default_memory_pool(), &reader);
ASSERT_TRUE(st.ok());
st = reader->ReadTable(&intable);
ASSERT_TRUE(st.ok());
auto chunks = intable->column(0)->chunks();
ASSERT_EQ(chunks.size(), 1);
auto inarray = std::dynamic_pointer_cast<arrow::Int64Array>(chunks[0]);
ASSERT_NE(inarray, nullptr);
ASSERT_EQ(inarray->Value(0), 1);
ASSERT_EQ(inarray->Value(1), 2);
ASSERT_EQ(inarray->Value(2), 3);
ASSERT_EQ(inarray->Value(3), 4);
ASSERT_EQ(inarray->Value(4), 5);
}
TEST(wrapper, boolean) {
auto payload = NewPayloadWriter(ColumnType::BOOL);
bool data[] = {true, false, true, false};
auto st = AddBooleanToPayload(payload, data, 4);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
st = FinishPayloadWriter(payload);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
auto cb = GetPayloadBufferFromWriter(payload);
ASSERT_GT(cb.length, 0);
ASSERT_NE(cb.data, nullptr);
auto nums = GetPayloadLengthFromWriter(payload);
ASSERT_EQ(nums, 4);
auto reader = NewPayloadReader(ColumnType::BOOL, (uint8_t *) cb.data, cb.length);
bool *values;
int length;
st = GetBoolFromPayload(reader, &values, &length);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
ASSERT_NE(values, nullptr);
ASSERT_EQ(length, 4);
length = GetPayloadLengthFromReader(reader);
ASSERT_EQ(length, 4);
for (int i = 0; i < length; i++) {
ASSERT_EQ(data[i], values[i]);
}
st = ReleasePayloadWriter(payload);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
st = ReleasePayloadReader(reader);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
}
#define NUMERIC_TEST(TEST_NAME, COLUMN_TYPE, DATA_TYPE, ADD_FUNC, GET_FUNC, ARRAY_TYPE) TEST(wrapper, TEST_NAME) { \
auto payload = NewPayloadWriter(COLUMN_TYPE); \
DATA_TYPE data[] = {-1, 1, -100, 100}; \
\
auto st = ADD_FUNC(payload, data, 4); \
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS); \
st = FinishPayloadWriter(payload); \
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS); \
auto cb = GetPayloadBufferFromWriter(payload); \
ASSERT_GT(cb.length, 0); \
ASSERT_NE(cb.data, nullptr); \
auto nums = GetPayloadLengthFromWriter(payload); \
ASSERT_EQ(nums, 4); \
\
auto reader = NewPayloadReader(COLUMN_TYPE,(uint8_t*)cb.data,cb.length); \
DATA_TYPE *values; \
int length; \
st = GET_FUNC(reader,&values,&length); \
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS); \
ASSERT_NE(values, nullptr); \
ASSERT_EQ(length, 4); \
length = GetPayloadLengthFromReader(reader); \
ASSERT_EQ(length, 4); \
\
for (int i = 0; i < length; i++) { \
ASSERT_EQ(data[i], values[i]); \
} \
\
st = ReleasePayloadWriter(payload); \
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS); \
st = ReleasePayloadReader(reader); \
ASSERT_EQ(st.error_code,ErrorCode::SUCCESS); \
}
NUMERIC_TEST(int8, ColumnType::INT8, int8_t, AddInt8ToPayload, GetInt8FromPayload, arrow::Int8Array)
NUMERIC_TEST(int16, ColumnType::INT16, int16_t, AddInt16ToPayload, GetInt16FromPayload, arrow::Int16Array)
NUMERIC_TEST(int32, ColumnType::INT32, int32_t, AddInt32ToPayload, GetInt32FromPayload, arrow::Int32Array)
NUMERIC_TEST(int64, ColumnType::INT64, int64_t, AddInt64ToPayload, GetInt64FromPayload, arrow::Int64Array)
NUMERIC_TEST(float32, ColumnType::FLOAT, float, AddFloatToPayload, GetFloatFromPayload, arrow::FloatArray)
NUMERIC_TEST(float64, ColumnType::DOUBLE, double, AddDoubleToPayload, GetDoubleFromPayload, arrow::DoubleArray)
TEST(wrapper, stringarray) {
auto payload = NewPayloadWriter(ColumnType::STRING);
auto st = AddOneStringToPayload(payload, (char *) "1234", 4);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
st = AddOneStringToPayload(payload, (char *) "12345", 5);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
char v[3] = {0};
v[1] = 'a';
st = AddOneStringToPayload(payload, v, 3);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
st = FinishPayloadWriter(payload);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
auto cb = GetPayloadBufferFromWriter(payload);
ASSERT_GT(cb.length, 0);
ASSERT_NE(cb.data, nullptr);
auto nums = GetPayloadLengthFromWriter(payload);
ASSERT_EQ(nums, 3);
auto reader = NewPayloadReader(ColumnType::STRING, (uint8_t *) cb.data, cb.length);
int length = GetPayloadLengthFromReader(reader);
ASSERT_EQ(length, 3);
char *v0, *v1, *v2;
int s0, s1, s2;
st = GetOneStringFromPayload(reader, 0, &v0, &s0);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
ASSERT_EQ(s0, 4);
ASSERT_EQ(v0[0], '1');
ASSERT_EQ(v0[1], '2');
ASSERT_EQ(v0[2], '3');
ASSERT_EQ(v0[3], '4');
st = GetOneStringFromPayload(reader, 1, &v1, &s1);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
ASSERT_EQ(s1, 5);
ASSERT_EQ(v1[0], '1');
ASSERT_EQ(v1[1], '2');
ASSERT_EQ(v1[2], '3');
ASSERT_EQ(v1[3], '4');
ASSERT_EQ(v1[4], '5');
st = GetOneStringFromPayload(reader, 2, &v2, &s2);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
ASSERT_EQ(s2, 3);
ASSERT_EQ(v2[0], 0);
ASSERT_EQ(v2[1], 'a');
ASSERT_EQ(v2[2], 0);
st = ReleasePayloadWriter(payload);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
st = ReleasePayloadReader(reader);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
}
TEST(wrapper, binary_vector) {
auto payload = NewPayloadWriter(ColumnType::VECTOR_BINARY);
uint8_t data[] = {0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8};
auto st = AddBinaryVectorToPayload(payload, data, 16, 4);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
st = FinishPayloadWriter(payload);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
auto cb = GetPayloadBufferFromWriter(payload);
ASSERT_GT(cb.length, 0);
ASSERT_NE(cb.data, nullptr);
auto nums = GetPayloadLengthFromWriter(payload);
ASSERT_EQ(nums, 4);
auto reader = NewPayloadReader(ColumnType::VECTOR_BINARY, (uint8_t *) cb.data, cb.length);
uint8_t *values;
int length;
int dim;
st = GetBinaryVectorFromPayload(reader, &values, &dim, &length);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
ASSERT_NE(values, nullptr);
ASSERT_EQ(dim, 16);
ASSERT_EQ(length, 4);
length = GetPayloadLengthFromReader(reader);
ASSERT_EQ(length, 4);
for (int i = 0; i < 8; i++) {
ASSERT_EQ(values[i], data[i]);
}
st = ReleasePayloadWriter(payload);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
st = ReleasePayloadReader(reader);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
}
TEST(wrapper, float_vector) {
auto payload = NewPayloadWriter(ColumnType::VECTOR_FLOAT);
float data[] = {1, 2, 3, 4, 5, 6, 7, 8};
auto st = AddFloatVectorToPayload(payload, data, 2, 4);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
st = FinishPayloadWriter(payload);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
auto cb = GetPayloadBufferFromWriter(payload);
ASSERT_GT(cb.length, 0);
ASSERT_NE(cb.data, nullptr);
auto nums = GetPayloadLengthFromWriter(payload);
ASSERT_EQ(nums, 4);
auto reader = NewPayloadReader(ColumnType::VECTOR_FLOAT, (uint8_t *) cb.data, cb.length);
float *values;
int length;
int dim;
st = GetFloatVectorFromPayload(reader, &values, &dim, &length);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
ASSERT_NE(values, nullptr);
ASSERT_EQ(dim, 2);
ASSERT_EQ(length, 4);
length = GetPayloadLengthFromReader(reader);
ASSERT_EQ(length, 4);
for (int i = 0; i < 8; i++) {
ASSERT_EQ(values[i], data[i]);
}
st = ReleasePayloadWriter(payload);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
st = ReleasePayloadReader(reader);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
}
TEST(wrapper, int8_2) {
auto payload = NewPayloadWriter(ColumnType::INT8);
int8_t data[] = {-1, 1, -100, 100};
auto st = AddInt8ToPayload(payload, data, 4);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
st = FinishPayloadWriter(payload);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
auto cb = GetPayloadBufferFromWriter(payload);
ASSERT_GT(cb.length, 0);
ASSERT_NE(cb.data, nullptr);
WriteToFile(cb);
auto nums = GetPayloadLengthFromWriter(payload);
ASSERT_EQ(nums, 4);
st = ReleasePayloadWriter(payload);
ASSERT_EQ(st.error_code, ErrorCode::SUCCESS);
auto table = ReadFromFile();
ASSERT_NE(table, nullptr);
auto chunks = table->column(0)->chunks();
ASSERT_EQ(chunks.size(), 1);
auto bool_array = std::dynamic_pointer_cast<arrow::Int8Array>(chunks[0]);
ASSERT_NE(bool_array, nullptr);
ASSERT_EQ(bool_array->Value(0), -1);
ASSERT_EQ(bool_array->Value(1), 1);
ASSERT_EQ(bool_array->Value(2), -100);
ASSERT_EQ(bool_array->Value(3), 100);
}

View File

@ -17,13 +17,10 @@
package storage
/*
#cgo CFLAGS: -I${SRCDIR}/cwrapper
#cgo pkg-config: milvus_storage
#cgo linux LDFLAGS: -L${SRCDIR}/cwrapper/output/lib -L${SRCDIR}/cwrapper/output/lib64 -lwrapper -lparquet -larrow -larrow_bundled_dependencies -lstdc++ -lm
#cgo darwin LDFLAGS: -L${SRCDIR}/cwrapper/output/lib -lwrapper -lparquet -larrow -larrow_bundled_dependencies -lstdc++ -lm
#cgo windows LDFLAGS: -L${SRCDIR}/cwrapper/output/lib -lwrapper -lparquet -larrow -lstdc++
#include <stdlib.h>
#include "ParquetWrapper.h"
#include "storage/parquet_c.h"
*/
import "C"
import (

View File

@ -1,13 +1,10 @@
package storage
/*
#cgo CFLAGS: -I${SRCDIR}/cwrapper
#cgo pkg-config: milvus_storage
#cgo linux LDFLAGS: -L${SRCDIR}/cwrapper/output/lib -L${SRCDIR}/cwrapper/output/lib64 -L${SRCDIR}/../core/output/lib -L${SRCDIR}/../core/output/lib64 -lwrapper -lparquet -larrow -larrow_bundled_dependencies -lstdc++ -lm
#cgo darwin LDFLAGS: -L${SRCDIR}/cwrapper/output/lib -L${SRCDIR}/../core/output/lib -lwrapper -lparquet -larrow -larrow_bundled_dependencies -lstdc++ -lm
#cgo windows LDFLAGS: -L${SRCDIR}/cwrapper/output/lib -L${SRCDIR}/../core/output/lib -lwrapper -lparquet -larrow -lstdc++
#include <stdlib.h>
#include "ParquetWrapper.h"
#include "storage/parquet_c.h"
*/
import "C"
import (

View File

@ -1,16 +1,10 @@
package indexcgowrapper
/*
#cgo CFLAGS: -I${SRCDIR}/../../core/output/include
#cgo darwin LDFLAGS: -L${SRCDIR}/../../core/output/lib -lmilvus_common -Wl,-rpath,"${SRCDIR}/../../core/output/lib"
#cgo linux LDFLAGS: -L${SRCDIR}/../../core/output/lib -lmilvus_common -Wl,-rpath=${SRCDIR}/../../core/output/lib
#cgo windows LDFLAGS: -L${SRCDIR}/../../core/output/lib -lmilvus_common -Wl,-rpath=${SRCDIR}/../../core/output/lib
#cgo pkg-config: milvus_common
#include <stdlib.h> // free
#include "indexbuilder/index_c.h"
*/
import "C"
import (

View File

@ -1,16 +1,10 @@
package indexcgowrapper
/*
#cgo CFLAGS: -I${SRCDIR}/../../core/output/include
#cgo darwin LDFLAGS: -L${SRCDIR}/../../core/output/lib -lmilvus_indexbuilder -Wl,-rpath,"${SRCDIR}/../../core/output/lib"
#cgo linux LDFLAGS: -L${SRCDIR}/../../core/output/lib -lmilvus_indexbuilder -Wl,-rpath=${SRCDIR}/../../core/output/lib
#cgo windows LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_indexbuilder -Wl,-rpath=${SRCDIR}/../core/output/lib
#cgo pkg-config: milvus_indexbuilder
#include <stdlib.h> // free
#include "indexbuilder/index_c.h"
*/
import "C"
import (

View File

@ -12,11 +12,7 @@
package metricsinfo
/*
#cgo CFLAGS: -I${SRCDIR}/../../core/output/include
#cgo darwin LDFLAGS: -L${SRCDIR}/../../core/output/lib -lmilvus_common -lmilvus_segcore -Wl,-rpath,"${SRCDIR}/../../core/output/lib"
#cgo linux LDFLAGS: -L${SRCDIR}/../../core/output/lib -lmilvus_common -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../../core/output/lib
#cgo windows LDFLAGS: -L${SRCDIR}/../../core/output/lib -lmilvus_common -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../../core/output/lib
#cgo pkg-config: milvus_common milvus_segcore
#include <stdlib.h>
#include "common/vector_index_c.h"

View File

@ -1,105 +0,0 @@
#!/bin/bash
# Licensed to the LF AI & Data foundation under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
SOURCE=${BASH_SOURCE[0]}
while [ -h $SOURCE ]; do # resolve $SOURCE until the file is no longer a symlink
DIR=$( cd -P $( dirname $SOURCE ) && pwd )
SOURCE=$(readlink $SOURCE)
[[ $SOURCE != /* ]] && SOURCE=$DIR/$SOURCE # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
done
DIR=$( cd -P $( dirname $SOURCE ) && pwd )
# DIR=${DIR}/../internal/storage/cwrapper
CMAKE_BUILD=${DIR}/../cwrapper_build
OUTPUT_LIB=${DIR}/../internal/storage/cwrapper/output
SRC_DIR=${DIR}/../internal/storage/cwrapper
if [ ! -d ${CMAKE_BUILD} ];then
mkdir ${CMAKE_BUILD}
fi
mkdir -p ${OUTPUT_LIB}
BUILD_TYPE="Debug"
CUSTOM_THIRDPARTY_PATH=""
EMBEDDED_MILVUS="OFF"
while getopts "a:g:t:h:f:b" arg; do
case $arg in
f)
CUSTOM_THIRDPARTY_PATH=$OPTARG
;;
t)
BUILD_TYPE=$OPTARG # BUILD_TYPE
;;
a)
GIT_ARROW_REPO=$OPTARG
;;
g)
GIT_ARROW_TAG=$OPTARG
;;
h) # help
echo "-t: build type(default: Debug)
-a: arrow repo(default: https://github.com/apache/arrow.git)
-g: arrow tag(default: apache-arrow-2.0.0)
-f: custom thirdparty path(default: "")
-h: help
-b: build embedded milvus(default: OFF)
"
exit 0
;;
b)
EMBEDDED_MILVUS="ON"
;;
?)
echo "ERROR! unknown argument"
exit 1
;;
esac
done
echo "BUILD_TYPE: " $BUILD_TYPE
echo "CUSTOM_THIRDPARTY_PATH: " $CUSTOM_THIRDPARTY_PATH
# MSYS system
CMAKE_GENERATOR="Unix Makefiles"
if [ "$MSYSTEM" == "MINGW64" ] ; then
CMAKE_GENERATOR="MSYS Makefiles"
fi
pushd ${CMAKE_BUILD}
CMAKE_CMD="cmake \
${CMAKE_EXTRA_ARGS} \
-DCMAKE_INSTALL_PREFIX=${OUTPUT_LIB} \
-DCMAKE_BUILD_TYPE=${BUILD_TYPE} \
-DEMBEDDED_MILVUS=${EMBEDDED_MILVUS} \
-DCUSTOM_THIRDPARTY_DOWNLOAD_PATH=${CUSTOM_THIRDPARTY_PATH} ${SRC_DIR}"
${CMAKE_CMD} -G "${CMAKE_GENERATOR}"
echo ${CMAKE_CMD}
if [[ ! ${jobs+1} ]]; then
unameOut="$(uname -s)"
case "${unameOut}" in
Linux*) jobs=$(nproc);;
MINGW64*) jobs=$(nproc);;
Darwin*) jobs=$(sysctl -n hw.physicalcpu);;
*) echo "UNKNOWN:${unameOut}" ; exit 0;
esac
fi
make -j ${jobs} && make install

View File

@ -1,124 +0,0 @@
#!/bin/bash
# Licensed to the LF AI & Data foundation under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
SOURCE=${BASH_SOURCE[0]}
while [ -h $SOURCE ]; do # resolve $SOURCE until the file is no longer a symlink
DIR=$( cd -P $( dirname $SOURCE ) && pwd )
SOURCE=$(readlink $SOURCE)
[[ $SOURCE != /* ]] && SOURCE=$DIR/$SOURCE # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
done
DIR=$( cd -P $( dirname $SOURCE ) && pwd )
CMAKE_BUILD=${DIR}/../cwrapper_rocksdb_build
OUTPUT_LIB=${DIR}/../internal/kv/rocksdb/cwrapper/output
SRC_DIR=${DIR}/../internal/kv/rocksdb/cwrapper
if [ ! -d ${CMAKE_BUILD} ];then
mkdir ${CMAKE_BUILD}
fi
if [ ! -d ${OUTPUT_LIB} ];then
mkdir ${OUTPUT_LIB}
fi
BUILD_TYPE="Debug"
CUSTOM_THIRDPARTY_PATH=""
EMBEDDED_MILVUS="OFF"
while getopts "t:h:f:b" arg; do
case $arg in
f)
CUSTOM_THIRDPARTY_PATH=$OPTARG
;;
t)
BUILD_TYPE=$OPTARG # BUILD_TYPE
;;
h) # help
echo "-t: build type(default: Debug)
-f: custom thirdparty path(default: "")
-h: help
-b: build embedded milvus(default: OFF)
"
exit 0
;;
b)
EMBEDDED_MILVUS="ON"
;;
?)
echo "ERROR! unknown argument"
exit 1
;;
esac
done
echo "BUILD_TYPE: " $BUILD_TYPE
echo "CUSTOM_THIRDPARTY_PATH: " $CUSTOM_THIRDPARTY_PATH
pushd ${CMAKE_BUILD}
CMAKE_CMD="cmake \
-DCMAKE_BUILD_TYPE=${BUILD_TYPE} \
-DCMAKE_INSTALL_PREFIX=${OUTPUT_LIB} \
-DEMBEDDED_MILVUS=${EMBEDDED_MILVUS} \
-DCUSTOM_THIRDPARTY_DOWNLOAD_PATH=${CUSTOM_THIRDPARTY_PATH} ${SRC_DIR}"
unameOut="$(uname -s)"
if [[ ! ${jobs+1} ]]; then
case "${unameOut}" in
Linux*) jobs=$(nproc);;
Darwin*) jobs=$(sysctl -n hw.physicalcpu);;
*) echo "UNKNOWN:${unameOut}"; exit 0;
esac
fi
if [ "$MSYSTEM" == "MINGW64" ] ; then
echo Using system rocksdb
else
echo ${CMAKE_CMD}
${CMAKE_CMD}
make -j ${jobs}
fi
go env -w CGO_CFLAGS="-I${OUTPUT_LIB}/include"
ldflags=""
if [ -f "${OUTPUT_LIB}/lib/librocksdb.a" ]; then
case "${unameOut}" in
Linux*) ldflags="-L${OUTPUT_LIB}/lib -l:librocksdb.a -lstdc++ -lm -lz";;
Darwin*) ldflags="-L${OUTPUT_LIB}/lib -lrocksdb -stdlib=libc++ -lm -lz -lbz2 -ldl";;
*) echo "UNKNOWN:${unameOut}"; exit 0;
esac
else
case "${unameOut}" in
Linux*) ldflags="-L${OUTPUT_LIB}/lib64 -l:librocksdb.a -lstdc++ -lm -lz";;
Darwin*) ldflags="-L${OUTPUT_LIB}/lib64 -lrocksdb -stdlib=libc++ -lm -lz -lbz2 -ldl";;
*) echo "UNKNOWN:${unameOut}" ; exit 0;
esac
fi
if [ "$MSYSTEM" == "MINGW64" ] ; then
ldflags="-L${OUTPUT_LIB}/lib -lrocksdb -lstdc++ -lm -lz -lshlwapi -lrpcrt4"
fi
if [[ $(arch) == 'arm64' ]]; then
go env -w GOARCH=arm64
fi
go env -w CGO_LDFLAGS="$ldflags" && GO111MODULE=on
go get github.com/tecbot/gorocksdb

View File

@ -21,8 +21,8 @@ pacman -S --noconfirm --needed \
mingw-w64-x86_64-python2 \
mingw-w64-x86_64-diffutils \
mingw-w64-x86_64-arrow \
mingw-w64-x86_64-go \
mingw-w64-x86_64-rocksdb
mingw-w64-x86_64-go
# mingw-w64-x86_64-rocksdb
# dummy empty dl, TODO: remove later

View File

@ -22,13 +22,24 @@ FILE_COVERAGE_HTML="go_coverage.html"
set -ex
echo "mode: atomic" > ${FILE_COVERAGE_INFO}
SOURCE="${BASH_SOURCE[0]}"
while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symlink
DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
SOURCE="$(readlink "$SOURCE")"
[[ $SOURCE != /* ]] && SOURCE="$DIR/$SOURCE" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
done
ROOT_DIR="$( cd -P "$( dirname "$SOURCE" )/.." && pwd )"
export PKG_CONFIG_PATH="${PKG_CONFIG_PATH}:$ROOT_DIR/internal/core/output/lib/pkgconfig:$ROOT_DIR/internal/core/output/lib64/pkgconfig"
export RPATH=$(pkg-config --libs-only-L milvus_common | cut -c 3-)
# run unittest
echo "Running unittest under ./internal"
if [[ $(uname -s) == "Darwin" && "$(uname -m)" == "arm64" ]]; then
APPLE_SILICON_FLAG="-tags dynamic"
fi
for d in $(go list ./internal/... | grep -v -e vendor -e kafka -e planparserv2/generated); do
go test -race ${APPLE_SILICON_FLAG} -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d"
go test -race ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d"
if [ -f profile.out ]; then
grep -v kafka profile.out | grep -v planparserv2/generated | sed '1d' >> ${FILE_COVERAGE_INFO}
rm profile.out

View File

@ -27,6 +27,9 @@ while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symli
done
ROOT_DIR="$( cd -P "$( dirname "$SOURCE" )/.." && pwd )"
export PKG_CONFIG_PATH="${PKG_CONFIG_PATH}:$ROOT_DIR/internal/core/output/lib/pkgconfig:$ROOT_DIR/internal/core/output/lib64/pkgconfig"
export RPATH=$(pkg-config --libs-only-L milvus_common | cut -c 3-)
if [[ $(uname -s) == "Darwin" ]]; then
export MallocNanoZone=0
fi
@ -39,30 +42,30 @@ fi
MILVUS_DIR="${ROOT_DIR}/internal/"
echo "Running go unittest under $MILVUS_DIR"
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/allocator/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/kv/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} $(go list "${MILVUS_DIR}/mq/..." | grep -v kafka) -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/storage" -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/tso/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/util/funcutil/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/util/paramtable/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/util/retry/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/util/sessionutil/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/util/trace/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/util/typeutil/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/util/importutil/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/proxy/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/datanode/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/indexnode/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/querynode/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/distributed/rootcoord/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/distributed/datacoord/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/distributed/querycoord/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/distributed/proxy/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/distributed/datanode/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/distributed/querynode/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/rootcoord" -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/datacoord/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/indexcoord/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/allocator/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/kv/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" $(go list "${MILVUS_DIR}/mq/..." | grep -v kafka) -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/storage" -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/tso/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/util/funcutil/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/util/paramtable/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/util/retry/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/util/sessionutil/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/util/trace/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/util/typeutil/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/util/importutil/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/proxy/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/datanode/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/indexnode/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/querynode/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/distributed/rootcoord/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/distributed/datacoord/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/distributed/querycoord/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/distributed/proxy/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/distributed/datanode/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/distributed/querynode/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/rootcoord" -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/datacoord/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} -ldflags="-r $RPATH" "${MILVUS_DIR}/indexcoord/..." -failfast
echo " Go unittest finished"