Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,16 @@ else ()
if (CMAKE_BUILD_TYPE STREQUAL "Debug")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g")
elseif (CMAKE_BUILD_TYPE STREQUAL "Release")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O2")
# -flto + MinGW gcc + statically-linked antlr4_static produces
# unresolved-reference errors at link time (LTO intermediate objects
# can't see the .a's vtable thunks). -march=native is also a poor
# default for CI binaries shipped to other machines. Keep both on
# Linux/macOS where the optimization actually pays off.
if (MINGW OR WIN32)
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3")
else ()
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -march=native -flto")
endif ()
elseif (CMAKE_BUILD_TYPE STREQUAL "RelWithDebInfo")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -O2 -g")
elseif (CMAKE_BUILD_TYPE STREQUAL "MinSizeRel")
Expand Down
12 changes: 11 additions & 1 deletion cpp/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ message("cmake using: ENABLE_LZOKAY=${ENABLE_LZOKAY}")
option(ENABLE_ZLIB "Enable Zlib compression" ON)
message("cmake using: ENABLE_ZLIB=${ENABLE_ZLIB}")

# ENABLE_SIMD is defined in the top-level CMakeLists.txt
message("cmake using: ENABLE_SIMD=${ENABLE_SIMD}")

message("Running in src directory")
if (${COV_ENABLED})
add_compile_options(-fprofile-arcs -ftest-coverage)
Expand Down Expand Up @@ -89,6 +92,13 @@ if (ENABLE_ANTLR4)
message("Adding ANTLR4 include directory")
endif()

if (ENABLE_SIMD)
add_definitions(-DENABLE_SIMD)
list(APPEND PROJECT_INCLUDE_DIR
${CMAKE_SOURCE_DIR}/third_party/simde-0.8.4-rc3
)
endif()

include_directories(${PROJECT_INCLUDE_DIR})

# Mark every translation unit that is compiled into the tsfile library so that
Expand Down Expand Up @@ -171,4 +181,4 @@ set_target_properties(tsfile PROPERTIES SOVERSION ${LIBTSFILE_SO_VERSION})
install(TARGETS tsfile
RUNTIME DESTINATION ${LIBRARY_OUTPUT_PATH}
LIBRARY DESTINATION ${LIBRARY_OUTPUT_PATH}
ARCHIVE DESTINATION ${LIBRARY_OUTPUT_PATH})
ARCHIVE DESTINATION ${LIBRARY_OUTPUT_PATH})
4 changes: 0 additions & 4 deletions cpp/src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ add_library(common_obj OBJECT ${common_SRC_LIST}
${common_mutex_SRC_LIST}
${common_datatype_SRC_LIST})

if (ENABLE_ANTLR4)
target_compile_definitions(common_obj PRIVATE ENABLE_ANTLR4)
endif()

# install header files recursively
file(GLOB_RECURSE HEADERS "${CMAKE_CURRENT_SOURCE_DIR}/*.h")
copy_to_dir(${HEADERS} "common_obj")
24 changes: 16 additions & 8 deletions cpp/src/common/allocator/alloc_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,35 +82,43 @@ class ModStat {
}
void init();
void destroy();
INLINE void update_alloc(AllocModID mid, int32_t size) {
INLINE void update_alloc(AllocModID mid, int64_t size) {
#ifdef ENABLE_MEM_STAT
ASSERT(mid < __LAST_MOD_ID);
ATOMIC_FAA(get_item(mid), size);
#endif
}
void update_free(AllocModID mid, uint32_t size) {
void update_free(AllocModID mid, uint64_t size) {
#ifdef ENABLE_MEM_STAT
ASSERT(mid < __LAST_MOD_ID);
ATOMIC_FAA(get_item(mid), 0 - size);
ATOMIC_FAA(get_item(mid), -static_cast<int64_t>(size));
#endif
}
void print_stat();

int64_t get_stat(int8_t mid) {
#ifdef ENABLE_MEM_STAT
if (stat_arr_ != NULL && mid < __LAST_MOD_ID)
return ATOMIC_FAA(get_item(mid), 0LL);
#endif
return 0;
}

#ifdef ENABLE_TEST
int32_t TEST_get_stat(int8_t mid) { return ATOMIC_FAA(get_item(mid), 0); }
int64_t TEST_get_stat(int8_t mid) { return ATOMIC_FAA(get_item(mid), 0LL); }
#endif

private:
INLINE int32_t* get_item(int8_t mid) {
return &(stat_arr_[mid * (ITEM_SIZE / sizeof(int32_t))]);
INLINE int64_t* get_item(int8_t mid) {
return &(stat_arr_[mid * (ITEM_SIZE / sizeof(int64_t))]);
}

private:
static const int32_t ITEM_SIZE = CACHE_LINE_SIZE;
static const int32_t ITEM_COUNT = __LAST_MOD_ID;
int32_t* stat_arr_;
int64_t* stat_arr_;

STATIC_ASSERT((ITEM_SIZE % sizeof(int32_t) == 0), ModStat_ITEM_SIZE_ERROR);
STATIC_ASSERT((ITEM_SIZE % sizeof(int64_t) == 0), ModStat_ITEM_SIZE_ERROR);
};

/* base allocator */
Expand Down
69 changes: 43 additions & 26 deletions cpp/src/common/allocator/byte_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ class ByteStream {
};

public:
static const uint32_t DEFAULT_PAGE_SIZE = 1024;

ByteStream(uint32_t page_size, AllocModID mid, bool enable_atomic = false,
BaseAllocator& allocator = g_base_allocator)
: allocator_(allocator),
Expand All @@ -263,10 +265,9 @@ class ByteStream {
read_pos_(0),
marked_read_pos_(0),
page_size_(page_size),
page_mask_(page_size - 1),
mid_(mid),
wrapped_page_(false, nullptr) {
// assert(page_size >= 16); // commented out by gxh on 2023.03.09
}
wrapped_page_(false, nullptr) {}

// for wrap plain buffer to ByteStream
ByteStream(AllocModID mid = MOD_DEFAULT)
Expand All @@ -278,6 +279,7 @@ class ByteStream {
read_pos_(0),
marked_read_pos_(0),
page_size_(0),
page_mask_(0),
mid_(mid),
wrapped_page_(false, nullptr) {}

Expand All @@ -290,7 +292,14 @@ class ByteStream {
wrapped_page_.next_.store(nullptr);
wrapped_page_.buf_ = (uint8_t*)buf;

page_size_ = buf_len;
// page_mask_ is used as a bitmask and only works correctly for
// power-of-2 page sizes. Round up to the next power-of-2 so that
// (read_pos_ & page_mask_) gives the correct within-page offset and
// the page-crossing check doesn't misfire on arbitrary buffer sizes.
uint32_t ps = 1;
while (ps < (uint32_t)buf_len) ps <<= 1;
page_size_ = ps;
page_mask_ = ps - 1;
head_.store(&wrapped_page_);
Comment on lines +295 to 303
tail_.store(&wrapped_page_);
total_size_.store(buf_len);
Expand Down Expand Up @@ -339,13 +348,14 @@ class ByteStream {
// never used TODO
void shallow_clone_from(ByteStream& other) {
this->page_size_ = other.page_size_;
this->page_mask_ = other.page_mask_;
this->mid_ = other.mid_;
this->head_.store(other.head_.load());
this->tail_.store(other.tail_.load());
this->total_size_.store(other.total_size_.load());
}

FORCE_INLINE uint32_t total_size() const { return total_size_.load(); }
FORCE_INLINE uint64_t total_size() const { return total_size_.load(); }
FORCE_INLINE uint32_t read_pos() const { return read_pos_; };
/**
* Seek the read cursor to an absolute offset. Re-anchors read_page_ for
Expand Down Expand Up @@ -380,10 +390,10 @@ class ByteStream {
std::cout << "write_buf error " << ret << std::endl;
return ret;
}
uint32_t remainder = page_size_ - (total_size_.load() % page_size_);
uint32_t remainder = page_size_ - (total_size_.load() & page_mask_);
uint32_t copy_len =
remainder < (len - write_len) ? remainder : (len - write_len);
memcpy(tail_.load()->buf_ + total_size_.load() % page_size_,
memcpy(tail_.load()->buf_ + (total_size_.load() & page_mask_),
buf + write_len, copy_len);
total_size_.atomic_aaf(copy_len);
write_len += copy_len;
Expand All @@ -404,11 +414,11 @@ class ByteStream {
if (RET_FAIL(check_space())) {
return ret;
}
uint32_t remainder = page_size_ - (read_pos_ % page_size_);
uint32_t remainder = page_size_ - (read_pos_ & page_mask_);
uint32_t copy_len = remainder < want_len_limited - read_len
? remainder
: want_len_limited - read_len;
memcpy(buf + read_len, read_page_->buf_ + (read_pos_ % page_size_),
memcpy(buf + read_len, read_page_->buf_ + (read_pos_ & page_mask_),
copy_len);
read_len += copy_len;
read_pos_ += copy_len;
Expand Down Expand Up @@ -460,16 +470,17 @@ class ByteStream {
return b;
}
b.buf_ =
(char*)(tail_.load()->buf_ + (total_size_.load() % page_size_));
b.len_ = page_size_ - (total_size_.load() % page_size_);
(char*)(tail_.load()->buf_ + (total_size_.load() & page_mask_));
b.len_ = page_size_ - (total_size_.load() & page_mask_);
return b;
}

void buffer_used(uint32_t used_bytes) {
ASSERT(used_bytes >= 1);
// would not span page
ASSERT((total_size_.load() / page_size_) ==
((total_size_.load() + used_bytes - 1) / page_size_));
ASSERT(page_size_ == 0 ||
(total_size_.load() / page_size_) ==
((total_size_.load() + used_bytes - 1) / page_size_));
total_size_.atomic_aaf(used_bytes);
}

Expand All @@ -485,7 +496,7 @@ class ByteStream {
if (RET_FAIL(prepare_space())) {
return ret;
}
uint32_t remainder = page_size_ - (total_size_.load() % page_size_);
uint32_t remainder = page_size_ - (total_size_.load() & page_mask_);
uint32_t step =
remainder < (len - advanced) ? remainder : (len - advanced);
total_size_.atomic_aaf(step);
Expand All @@ -504,6 +515,7 @@ class ByteStream {
Page* cur_;
Page* end_;
int64_t total_size_;
int64_t consumed_ = 0;
BufferIterator(const ByteStream& bs) : host_(bs) {
cur_ = bs.head_.load();
end_ = bs.tail_.load();
Expand All @@ -514,13 +526,17 @@ class ByteStream {
Buffer b;
if (cur_ != nullptr) {
b.buf_ = (char*)cur_->buf_;
if (cur_ == end_ &&
host_.total_size_.load() % host_.page_size_ != 0) {
b.len_ = host_.total_size_.load() % host_.page_size_;
if (cur_ == end_) {
// Last page: clamp to remaining total_size_. For wrapped
// streams page_size_ may have been rounded up past the
// user buffer (see wrap_from), so we must not return
// page_size_ as the length here.
b.len_ = static_cast<uint32_t>(total_size_ - consumed_);
} else {
b.len_ = host_.page_size_;
}
ASSERT(b.len_ > 0);
consumed_ += b.len_;
cur_ = cur_->next_.load();
}
return b;
Expand Down Expand Up @@ -562,7 +578,7 @@ class ByteStream {

// get tail position <tail_, total_size_> atomically
Page* host_end = nullptr;
uint32_t host_total_size = 0;
uint64_t host_total_size = 0;
while (true) {
host_end = host_.tail_.load();
host_total_size = host_.total_size_.load();
Expand All @@ -573,7 +589,7 @@ class ByteStream {

while (true) {
if (cur_ == host_end) {
if (host_total_size % host_.page_size_ == 0) {
if ((host_total_size & host_.page_mask_) == 0) {
if (read_offset_within_cur_page_ == host_.page_size_) {
return b;
} else {
Expand All @@ -587,15 +603,15 @@ class ByteStream {
}
} else {
if (read_offset_within_cur_page_ ==
(host_total_size % host_.page_size_)) {
(host_total_size & host_.page_mask_)) {
return b;
} else {
b.buf_ = ((char*)(cur_->buf_)) +
read_offset_within_cur_page_;
b.len_ = (host_total_size % host_.page_size_) -
b.len_ = (host_total_size & host_.page_mask_) -
read_offset_within_cur_page_;
read_offset_within_cur_page_ =
(host_total_size % host_.page_size_);
(host_total_size & host_.page_mask_);
total_end_offset_ += b.len_;
return b;
}
Expand Down Expand Up @@ -625,7 +641,7 @@ class ByteStream {
FORCE_INLINE int prepare_space() {
int ret = common::E_OK;
if (UNLIKELY(tail_.load() == nullptr ||
total_size_.load() % page_size_ == 0)) {
(total_size_.load() & page_mask_) == 0)) {
Page* p = nullptr;
if (RET_FAIL(alloc_page(p))) {
return ret;
Expand All @@ -642,7 +658,7 @@ class ByteStream {
}
if (UNLIKELY(read_page_ == nullptr)) {
read_page_ = head_.load();
} else if (UNLIKELY(read_pos_ % page_size_ == 0)) {
} else if (UNLIKELY((read_pos_ & page_mask_) == 0)) {
read_page_ = read_page_->next_.load();
}
if (UNLIKELY(read_page_ == nullptr)) {
Expand Down Expand Up @@ -678,10 +694,11 @@ class ByteStream {
OptionalAtomic<Page*> head_;
OptionalAtomic<Page*> tail_;
Page* read_page_; // only one thread is allow to reader this ByteStream
OptionalAtomic<uint32_t> total_size_; // total size in byte
OptionalAtomic<uint64_t> total_size_; // total size in byte
uint32_t read_pos_; // current reader position
uint32_t marked_read_pos_; // current reader position
uint32_t page_size_;
uint32_t page_mask_; // page_size_ - 1, for bitwise AND instead of modulo
AllocModID mid_;

public:
Expand Down Expand Up @@ -1181,14 +1198,14 @@ class SerializationUtil {
// indicates that memory has been allocated and must be freed.
FORCE_INLINE static int read_var_char_ptr(std::string*& str,
ByteStream& in) {
str = nullptr;
int ret = common::E_OK;
int32_t len = 0;
int32_t read_len = 0;
if (RET_FAIL(read_var_int(len, in))) {
return ret;
} else {
if (len == storage::NO_STR_TO_READ) {
str = nullptr;
return ret;
} else {
char* tmp_buf =
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/common/allocator/mem_alloc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void* mem_alloc(uint32_t size, AllocModID mid) {
auto high4b = static_cast<uint32_t>(header >> 32);
*reinterpret_cast<uint32_t*>(raw) = high4b;
*reinterpret_cast<uint32_t*>(raw + 4) = low4b;
ModStat::get_instance().update_alloc(mid, static_cast<int32_t>(size));
ModStat::get_instance().update_alloc(mid, static_cast<int64_t>(size));
return raw + header_size;
}

Expand Down Expand Up @@ -158,17 +158,17 @@ void* mem_realloc(void* ptr, uint32_t size) {
*reinterpret_cast<uint32_t*>(p) = high4b;
*reinterpret_cast<uint32_t*>(p + 4) = low4b;
ModStat::get_instance().update_alloc(
mid, int32_t(size) - int32_t(original_size));
mid, int64_t(size) - int64_t(original_size));
return p + ALIGNMENT;
}

void ModStat::init() {
if (stat_arr_ != NULL) {
return;
}
stat_arr_ = (int32_t*)(::malloc(ITEM_SIZE * ITEM_COUNT));
stat_arr_ = (int64_t*)(::malloc(ITEM_SIZE * ITEM_COUNT));
for (int8_t i = 0; i < __LAST_MOD_ID; i++) {
int32_t* item = get_item(i);
int64_t* item = get_item(i);
*item = 0;
}
}
Expand All @@ -183,14 +183,14 @@ void ModStat::print_stat() {

struct Entry {
const char* name;
int32_t val;
int64_t val;
};
Entry entries[__LAST_MOD_ID];
int count = 0;
int64_t total = 0;

for (int i = 0; i < __LAST_MOD_ID; i++) {
int32_t val = ATOMIC_FAA(get_item(i), 0);
int64_t val = ATOMIC_FAA(get_item(i), 0LL);
total += val;
if (val != 0) {
entries[count++] = {g_mod_names[i], val};
Expand Down
Loading
Loading