enhance: Use fdopen, fwrite to reduce direct syscall (#38157)

`File.Write` and `File.WriteInt` use `write`, which may be just direct
syscall in some systems. When mappding field data and write line by
line, this could cost lost of CPU time when the row number is large.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/38179/head
congqixia 2024-12-03 15:24:39 +08:00 committed by GitHub
parent 28d39399e2
commit 767b7e6218
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 37 additions and 11 deletions

View File

@ -11,11 +11,13 @@
#pragma once #pragma once
#include <cstdio>
#include <string> #include <string>
#include "common/EasyAssert.h" #include "common/EasyAssert.h"
#include "common/Types.h" #include "common/Types.h"
#include "fmt/core.h" #include "fmt/core.h"
#include <fcntl.h> #include <fcntl.h>
#include <sys/types.h>
#include <unistd.h> #include <unistd.h>
namespace milvus { namespace milvus {
@ -27,8 +29,8 @@ class File {
file.fd_ = -1; file.fd_ = -1;
} }
~File() { ~File() {
if (fd_ >= 0) { if (fs_ != nullptr) {
close(fd_); fclose(fs_);
} }
} }
@ -63,6 +65,22 @@ class File {
return write(fd_, &value, sizeof(value)); return write(fd_, &value, sizeof(value));
} }
ssize_t
FWrite(const void* buf, size_t size) {
return fwrite(buf, sizeof(char), size, fs_);
}
template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
ssize_t
FWriteInt(T value) {
return fwrite(&value, 1, sizeof(value), fs_);
}
int
FFlush() {
return fflush(fs_);
}
offset_t offset_t
Seek(offset_t offset, int whence) { Seek(offset_t offset, int whence) {
return lseek(fd_, offset, whence); return lseek(fd_, offset, whence);
@ -70,15 +88,22 @@ class File {
void void
Close() { Close() {
close(fd_); fclose(fs_);
fs_ = nullptr;
fd_ = -1; fd_ = -1;
} }
private: private:
explicit File(int fd, const std::string& filepath) explicit File(int fd, const std::string& filepath)
: fd_(fd), filepath_(filepath) { : fd_(fd), filepath_(filepath) {
fs_ = fdopen(fd_, "wb+");
AssertInfo(fs_ != nullptr,
"failed to open file {}: {}",
filepath,
strerror(errno));
} }
int fd_{-1}; int fd_{-1};
FILE* fs_;
std::string filepath_; std::string filepath_;
}; };
} // namespace milvus } // namespace milvus

View File

@ -102,12 +102,12 @@ WriteFieldData(File& file,
auto str = auto str =
static_cast<const std::string*>(data->RawValue(i)); static_cast<const std::string*>(data->RawValue(i));
ssize_t written_data_size = ssize_t written_data_size =
file.WriteInt<uint32_t>(uint32_t(str->size())); file.FWriteInt<uint32_t>(uint32_t(str->size()));
if (written_data_size != sizeof(uint32_t)) { if (written_data_size != sizeof(uint32_t)) {
THROW_FILE_WRITE_ERROR THROW_FILE_WRITE_ERROR
} }
total_written += written_data_size; total_written += written_data_size;
auto written_data = file.Write(str->data(), str->size()); auto written_data = file.FWrite(str->data(), str->size());
if (written_data < str->size()) { if (written_data < str->size()) {
THROW_FILE_WRITE_ERROR THROW_FILE_WRITE_ERROR
} }
@ -121,14 +121,14 @@ WriteFieldData(File& file,
indices.push_back(total_written); indices.push_back(total_written);
auto padded_string = auto padded_string =
static_cast<const Json*>(data->RawValue(i))->data(); static_cast<const Json*>(data->RawValue(i))->data();
ssize_t written_data_size = ssize_t written_data_size = file.FWriteInt<uint32_t>(
file.WriteInt<uint32_t>(uint32_t(padded_string.size())); uint32_t(padded_string.size()));
if (written_data_size != sizeof(uint32_t)) { if (written_data_size != sizeof(uint32_t)) {
THROW_FILE_WRITE_ERROR THROW_FILE_WRITE_ERROR
} }
total_written += written_data_size; total_written += written_data_size;
ssize_t written_data = ssize_t written_data =
file.Write(padded_string.data(), padded_string.size()); file.FWrite(padded_string.data(), padded_string.size());
if (written_data < padded_string.size()) { if (written_data < padded_string.size()) {
THROW_FILE_WRITE_ERROR THROW_FILE_WRITE_ERROR
} }
@ -142,7 +142,7 @@ WriteFieldData(File& file,
indices.push_back(total_written); indices.push_back(total_written);
auto array = static_cast<const Array*>(data->RawValue(i)); auto array = static_cast<const Array*>(data->RawValue(i));
ssize_t written = ssize_t written =
file.Write(array->data(), array->byte_size()); file.FWrite(array->data(), array->byte_size());
if (written < array->byte_size()) { if (written < array->byte_size()) {
THROW_FILE_WRITE_ERROR THROW_FILE_WRITE_ERROR
} }
@ -158,7 +158,7 @@ WriteFieldData(File& file,
static_cast<const knowhere::sparse::SparseRow<float>*>( static_cast<const knowhere::sparse::SparseRow<float>*>(
data->RawValue(i)); data->RawValue(i));
ssize_t written = ssize_t written =
file.Write(vec->data(), vec->data_byte_size()); file.FWrite(vec->data(), vec->data_byte_size());
if (written < vec->data_byte_size()) { if (written < vec->data_byte_size()) {
break; break;
} }
@ -173,7 +173,7 @@ WriteFieldData(File& file,
} }
} else { } else {
// write as: data|data|data|data|data|data...... // write as: data|data|data|data|data|data......
size_t written = file.Write(data->Data(), data->DataSize()); size_t written = file.FWrite(data->Data(), data->DataSize());
if (written < data->DataSize()) { if (written < data->DataSize()) {
THROW_FILE_WRITE_ERROR THROW_FILE_WRITE_ERROR
} }
@ -191,5 +191,6 @@ WriteFieldData(File& file,
valid_data.push_back(data->is_valid(i)); valid_data.push_back(data->is_valid(i));
} }
} }
file.FFlush();
} }
} // namespace milvus } // namespace milvus