enhance: Add buffered writer to reduce fwrite syscall (#38570)

Related to previous PR #38157

If mmapped row is too small, frequent fwrite call still cost too much
cpu time for context switching. This PR add buffered write to avoid this
bad case with extra buffer per variable field.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/38789/head
congqixia 2024-12-27 12:20:50 +08:00 committed by GitHub
parent 4df444ef25
commit 19052ef3e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 101 additions and 43 deletions

View File

@ -21,6 +21,13 @@
#include <unistd.h>
namespace milvus {
#define THROW_FILE_WRITE_ERROR \
PanicInfo(ErrorCode::FileWriteFailed, \
fmt::format("write data to file {} failed, error code {}", \
file_.Path(), \
strerror(errno)));
class File {
public:
File(const File& file) = delete;
@ -36,12 +43,26 @@ class File {
static File
Open(const std::string_view filepath, int flags) {
// using default buf size = 4096
return Open(filepath, flags, 4096);
}
static File
Open(const std::string_view filepath, int flags, size_t buf_size) {
int fd = open(filepath.data(), flags, S_IRUSR | S_IWUSR);
AssertInfo(fd != -1,
"failed to create mmap file {}: {}",
filepath,
strerror(errno));
return File(fd, std::string(filepath));
FILE* fs = fdopen(fd, "wb+");
AssertInfo(fs != nullptr,
"failed to open file {}: {}",
filepath,
strerror(errno));
auto f = File(fd, fs, std::string(filepath));
// setup buffer size file stream will use
setvbuf(f.fs_, nullptr, _IOFBF, buf_size);
return f;
}
int
@ -94,16 +115,71 @@ class File {
}
private:
explicit File(int fd, const std::string& filepath)
: fd_(fd), filepath_(filepath) {
fs_ = fdopen(fd_, "wb+");
AssertInfo(fs_ != nullptr,
"failed to open file {}: {}",
filepath,
strerror(errno));
explicit File(int fd, FILE* fs, const std::string& filepath)
: fd_(fd), filepath_(filepath), fs_(fs) {
}
int fd_{-1};
FILE* fs_;
std::string filepath_;
};
class BufferedWriter {
public:
// Constructor: Initialize with the file pointer and the buffer size (default 4KB).
explicit BufferedWriter(File& file, size_t buffer_size = 4096)
: file_(file),
buffer_size_(buffer_size),
buffer_(new char[buffer_size]) {
}
~BufferedWriter() {
// Ensure the buffer is flushed when the object is destroyed
flush();
delete[] buffer_;
}
// Write method to handle data larger than the buffer
void
Write(const void* data, size_t size) {
if (size > buffer_size_) {
flush();
ssize_t written_data_size = file_.FWrite(data, size);
if (written_data_size != size) {
THROW_FILE_WRITE_ERROR
}
return;
}
if (buffer_pos_ + size > buffer_size_) {
flush();
}
std::memcpy(buffer_ + buffer_pos_, data, size);
buffer_pos_ += size;
}
template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
void
WriteInt(T value) {
Write(&value, sizeof(value));
}
// Flush method: Write the contents of the buffer to the file
void
flush() {
if (buffer_pos_ > 0) {
ssize_t written_data_size = file_.FWrite(buffer_, buffer_pos_);
if (written_data_size != buffer_pos_) {
THROW_FILE_WRITE_ERROR
}
buffer_pos_ = 0;
}
}
private:
File& file_; // File pointer
size_t buffer_size_; // Size of the internal buffer
char* buffer_; // The buffer itself
size_t buffer_pos_{0}; // Current position in the buffer
};
} // namespace milvus

View File

@ -93,6 +93,9 @@ WriteFieldData(File& file,
std::vector<std::vector<uint64_t>>& element_indices,
FixedVector<bool>& valid_data) {
if (IsVariableDataType(data_type)) {
// use buffered writer to reduce fwrite/write syscall
// buffer size = 1024*1024 = 1MB
BufferedWriter bw = BufferedWriter(file, 1048576);
switch (data_type) {
case DataType::VARCHAR:
case DataType::STRING: {
@ -101,17 +104,10 @@ WriteFieldData(File& file,
indices.push_back(total_written);
auto str =
static_cast<const std::string*>(data->RawValue(i));
ssize_t written_data_size =
file.FWriteInt<uint32_t>(uint32_t(str->size()));
if (written_data_size != sizeof(uint32_t)) {
THROW_FILE_WRITE_ERROR
}
total_written += written_data_size;
auto written_data = file.FWrite(str->data(), str->size());
if (written_data < str->size()) {
THROW_FILE_WRITE_ERROR
}
total_written += written_data;
bw.WriteInt<uint32_t>(static_cast<uint32_t>(str->size()));
total_written += sizeof(uint32_t);
bw.Write(str->data(), str->size());
total_written += str->size();
}
break;
}
@ -121,18 +117,11 @@ WriteFieldData(File& file,
indices.push_back(total_written);
auto padded_string =
static_cast<const Json*>(data->RawValue(i))->data();
ssize_t written_data_size = file.FWriteInt<uint32_t>(
uint32_t(padded_string.size()));
if (written_data_size != sizeof(uint32_t)) {
THROW_FILE_WRITE_ERROR
}
total_written += written_data_size;
ssize_t written_data =
file.FWrite(padded_string.data(), padded_string.size());
if (written_data < padded_string.size()) {
THROW_FILE_WRITE_ERROR
}
total_written += written_data;
bw.WriteInt<uint32_t>(
static_cast<uint32_t>(padded_string.size()));
total_written += padded_string.size();
bw.Write(padded_string.data(), padded_string.size());
total_written += padded_string.size();
}
break;
}
@ -141,13 +130,9 @@ WriteFieldData(File& file,
for (size_t i = 0; i < data->get_num_rows(); ++i) {
indices.push_back(total_written);
auto array = static_cast<const Array*>(data->RawValue(i));
ssize_t written =
file.FWrite(array->data(), array->byte_size());
if (written < array->byte_size()) {
THROW_FILE_WRITE_ERROR
}
bw.Write(array->data(), array->byte_size());
element_indices.emplace_back(array->get_offsets());
total_written += written;
total_written += array->byte_size();
}
break;
}
@ -157,12 +142,8 @@ WriteFieldData(File& file,
auto vec =
static_cast<const knowhere::sparse::SparseRow<float>*>(
data->RawValue(i));
ssize_t written =
file.FWrite(vec->data(), vec->data_byte_size());
if (written < vec->data_byte_size()) {
break;
}
total_written += written;
bw.Write(vec->data(), vec->data_byte_size());
total_written += vec->data_byte_size();
}
break;
}
@ -171,6 +152,7 @@ WriteFieldData(File& file,
"not supported data type {}",
GetDataTypeName(data_type));
}
bw.flush();
} else {
// write as: data|data|data|data|data|data......
size_t written = file.FWrite(data->Data(), data->DataSize());