Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
ksergey committed Oct 1, 2024
1 parent 7575d1d commit 2398ff4
Show file tree
Hide file tree
Showing 14 changed files with 117 additions and 63 deletions.
9 changes: 9 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ endif()

include(cmake/TurboQHelpers.cmake)

if(NOT TARGET fmt::fmt)
add_subdirectory(deps/fmt)
endif()
if(NOT TARGET Boost::outcome)
add_subdirectory(deps/boost_outcome)
endif()
if(NOT TARGET Boost::scope_exit)
add_subdirectory(deps/boost_scope_exit)
endif()
if(NOT TARGET doctest::doctest_with_main)
add_subdirectory(deps/doctest)
endif()
Expand Down
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[<img src="https://img.shields.io/github/license/ksergey/turboq">](https://opensource.org/license/agpl-v3)
[<img src="https://img.shields.io/github/actions/workflow/status/ksergey/turboq/build-and-test.yml?logo=linux">](https://github.com/ksergey/turboq/actions/workflows/build-and-test.yml)
[<img src="https://img.shields.io/badge/language-C%2B%2B20-red">](https://en.wikipedia.org/wiki/C%2B%2B23)
[<img src="https://img.shields.io/badge/language-C%2B%2B20-red">](https://en.wikipedia.org/wiki/C%2B%2B20)

## turboq: message queues for low latency inter-process communications

Expand All @@ -13,5 +13,9 @@

## Requirements

- C++23 (gcc-14+, clang-18+)
- benchmark, doctest
- C++20 (gcc-14+, clang-18+)
- benchmark, doctest, fmt, boost

<!--
perf stat -e cache-references,cache-misses,L1-dcache-prefetches,instructions,cpu-cycles,branches,branch-misses,duration_time ./turboq-BoundedSPSCRawQueue-test
-->
4 changes: 3 additions & 1 deletion code/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ set(TargetName turboq)

add_library(${TargetName})
target_compile_features(${TargetName}
PUBLIC cxx_std_23)
PUBLIC cxx_std_20)
set_target_properties(${TargetName}
PROPERTIES
CXX_STANDARD_REQUIRED ON
Expand All @@ -11,6 +11,8 @@ target_compile_options(${TargetName}
PUBLIC -Wall -Wextra -Wattributes -Wpedantic -Wstrict-aliasing -Wcast-align -g)
target_include_directories(${TargetName}
PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(${TargetName}
PUBLIC fmt::fmt Boost::outcome Boost::scope_exit)

if (TurboQMasterProject)
target_compile_options(${TargetName}
Expand Down
17 changes: 13 additions & 4 deletions code/turboq/File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,19 @@
#include <sys/types.h>
#include <unistd.h>

#include <fmt/format.h>

#include <cassert>
#include <print>
#include <system_error>

#ifndef MFD_CLOEXEC
int memfd_create(const char* name, unsigned int flags) {
// Shouldn't work on linux before 3.17
return syscall(__NR_memfd_create, name, flags);
}
#define MFD_CLOEXEC FD_CLOEXEC
#endif

namespace turboq {
namespace {

Expand Down Expand Up @@ -83,7 +92,7 @@ File::~File() noexcept {
auto const fd = fd_;
if (auto const result = closeNoThrow(); !result) {
if (result.error().value() == EBADF) {
std::print(stderr, "turboq: closing fd {}, it may already have been closed\n", fd);
fmt::print(stderr, "turboq: closing fd {}, it may already have been closed\n", fd);
}
}
}
Expand All @@ -102,7 +111,7 @@ Result<> File::closeNoThrow() noexcept {
if (rc != 0) {
return makePosixErrorCode(errno);
} else {
return {};
return success();
}
}

Expand Down Expand Up @@ -183,7 +192,7 @@ Result<> File::tryTruncate(std::size_t size) const noexcept {
if (::ftruncate(this->get(), size) == -1) {
return makePosixErrorCode(errno);
}
return {};
return success();
}

void File::truncate(std::size_t size) const {
Expand Down
6 changes: 3 additions & 3 deletions code/turboq/MappedRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@

#include "MappedRegion.h"

#include <print>

#include <sys/mman.h>

#include <fmt/format.h>

namespace turboq {

MappedRegion::~MappedRegion() noexcept {
if (size_ > 0) {
if (::munmap(data_, size_) != 0) {
std::print(stderr, "closing mapped region, it may be already unmapped\n");
fmt::print(stderr, "closing mapped region, it may be already unmapped\n");
}
}
}
Expand Down
26 changes: 8 additions & 18 deletions code/turboq/MemorySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,16 @@
#include <bit>
#include <cassert>
#include <charconv>
#include <format>
#include <print>
#include <ranges>
#include <regex>
#include <string_view>
#include <system_error>
#include <vector>

namespace turboq {

template <typename Fn>
struct [[nodiscard]] AtScopeExit : Fn {
AtScopeExit(Fn&& fn) : Fn(std::forward<Fn>(fn)) {}
~AtScopeExit() noexcept {
(*this)();
}
};
template <typename Fn>
AtScopeExit(Fn&& fn) -> AtScopeExit<Fn>;
#include <boost/scope_exit.hpp>
#include <fmt/format.h>

namespace turboq {
namespace {

std::size_t const gDefaultPageSize = ::sysconf(_SC_PAGESIZE);
Expand All @@ -50,12 +40,12 @@ Result<std::size_t> getDefaultHugePageSize() noexcept {
char* line = nullptr;
std::size_t len = 0;

auto atExit = AtScopeExit([&] {
BOOST_SCOPE_EXIT_ALL(&) {
::fclose(handle);
if (line) {
::free(line);
}
});
};

std::cmatch match;

Expand Down Expand Up @@ -112,9 +102,9 @@ std::vector<MemoryMountPoint> readProcMounts() {
throw std::system_error(ENOENT, getPosixErrorCategory(), "setmntent(...)");
}

auto atExit = AtScopeExit([&] {
BOOST_SCOPE_EXIT_ALL(&) {
::endmntent(handle);
});
};

std::vector<MemoryMountPoint> entries;

Expand All @@ -137,7 +127,7 @@ std::vector<MemoryMountPoint> readProcMounts() {
if (defaultHugePageSize) {
pageSize = defaultHugePageSize;
} else {
std::print(stderr, "turboq: pagesize option error for mount point \"{}\" ({}): {}\n", mntent.mnt_dir,
fmt::print(stderr, "turboq: pagesize option error for mount point \"{}\" ({}): {}\n", mntent.mnt_dir,
mntent.mnt_fsname, pageSize.error().message());
continue;
}
Expand Down
29 changes: 13 additions & 16 deletions code/turboq/Result.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,9 @@
#include <exception>
#include <system_error>

#include <turboq/platform.h>
#include <boost/outcome.hpp>

#if __cpp_concepts >= 202002L
#include <expected>
#elif __clang_major__ >= 17
#define turboq_save__cpp_concepts
#pragma clang diagnostic ignored "-Wbuiltin-macro-redefined"
#define __cpp_concepts 202002L
#include <expected>
#pragma clang diagnostic ignored "-Wmacro-redefined"
#define __cpp_concepts turboq_save__cpp_concepts
#undef turboq_save__cpp_concepts
#endif
#include <turboq/platform.h>

namespace turboq {

Expand All @@ -45,13 +35,20 @@ TURBOQ_FORCE_INLINE std::error_category const& getPosixErrorCategory() noexcept
return errorCategory;
}

/// Optional with failure reason.
/// @see boost::outcome
/// mimic: std::expected from c++23
template <class T = void, class E = std::error_code>
using Result = std::expected<T, E>;
using Result = BOOST_OUTCOME_V2_NAMESPACE::std_result<T, E>;

/// @see boost::outcome
using BOOST_OUTCOME_V2_NAMESPACE::success;

/// @see boost::outcome
using BOOST_OUTCOME_V2_NAMESPACE::failure;

/// Return ErrorCode with posix error
TURBOQ_FORCE_INLINE std::unexpected<std::error_code> makePosixErrorCode(int ec) noexcept {
return std::unexpected<std::error_code>(std::error_code(ec, getPosixErrorCategory()));
TURBOQ_FORCE_INLINE decltype(auto) makePosixErrorCode(int ec) noexcept {
return failure(std::error_code(ec, getPosixErrorCategory()));
}

} // namespace turboq
7 changes: 3 additions & 4 deletions code/turboq/benchmark3_bm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
#include <cassert>
#include <cmath>
#include <cstdint>
#include <format>
#include <numeric>
#include <print>
#include <thread>
#include <vector>

#include <benchmark/benchmark.h>
#include <fmt/format.h>

#include "BoundedMPSCRawQueue.h"
#include "BoundedSPMCRawQueue.h"
Expand Down Expand Up @@ -117,7 +116,7 @@ inline void bindCurrentThreadToCore(int coreNo) noexcept {

auto const rc = ::pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
if (rc != 0) {
std::print(stderr, "failed to bind current thread to core: {}\n", ::strerror(rc));
fmt::print(stderr, "failed to bind current thread to core: {}\n", ::strerror(rc));
}
}

Expand Down Expand Up @@ -213,7 +212,7 @@ static void BM_EnqueueDequeue(::benchmark::State& state) {
std::uint64_t const expected = (Ops) * (Ops - 1) / 2;
std::uint64_t const actual = sum.load();
if (expected != actual) {
state.SkipWithError(std::format("Expected sum {}, got {}", expected, actual));
state.SkipWithError(fmt::format("Expected sum {}, got {}", expected, actual));
}
};
return runOnce<ProducersCount, ConsumersCount, BindToCoreT>(produceFn, consumeFn, endFn);
Expand Down
4 changes: 2 additions & 2 deletions code/turboq/concepts.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ namespace turboq {

/// Checks T is Producer type
template <typename T>
concept TurboQProducer = requires(T obj, std::size_t size) {
concept Producer = requires(T obj, std::size_t size) {
{ obj.prepare(size) } -> std::same_as<std::span<std::byte>>;
{ obj.commit() } -> std::same_as<void>;
{ obj.commit(size) } -> std::same_as<void>;
};

/// Checks T is Consumer type
template <typename T>
concept TurboQConsumer = requires(T obj) {
concept Consumer = requires(T obj) {
{ obj.fetch() } -> std::same_as<std::span<std::byte const>>;
{ obj.consume() } -> std::same_as<void>;
{ obj.reset() } -> std::same_as<void>;
Expand Down
3 changes: 3 additions & 0 deletions code/turboq/detail/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

namespace turboq::detail {

/// Map file to memory
MappedRegion mapFile(File const& file, std::size_t fileSize);

/// \overload
MappedRegion mapFile(File const& file);

} // namespace turboq::detail
24 changes: 12 additions & 12 deletions code/turboq/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,40 @@

namespace turboq {

template <typename Producer, typename Data>
requires TurboQProducer<Producer> and std::is_trivially_copyable_v<Data>
TURBOQ_FORCE_INLINE bool enqueue(Producer& producer, Data const& data) {
template <typename ProducerT, typename DataT>
requires Producer<ProducerT> and std::is_trivially_copyable_v<DataT>
TURBOQ_FORCE_INLINE bool enqueue(ProducerT& producer, DataT const& data) {
auto buffer = producer.prepare(sizeof(data));
if (buffer.empty()) {
return false;
}

*std::bit_cast<Data*>(buffer.data()) = data;
*std::bit_cast<DataT*>(buffer.data()) = data;
producer.commit();

return true;
}

template <typename Consumer, typename Data>
requires TurboQConsumer<Consumer> and std::is_trivially_copyable_v<Data>
TURBOQ_FORCE_INLINE bool dequeue(Consumer& consumer, Data& data) {
template <typename ConsumerT, typename DataT>
requires Consumer<ConsumerT> and std::is_trivially_copyable_v<DataT>
TURBOQ_FORCE_INLINE bool dequeue(ConsumerT& consumer, DataT& data) {
auto buffer = consumer.fetch();
if (buffer.empty()) {
return false;
}
data = *std::bit_cast<Data const*>(buffer.data());
data = *std::bit_cast<DataT const*>(buffer.data());
consumer.consume();
return true;
}

template <typename Consumer, typename Data>
requires TurboQConsumer<Consumer> and std::is_trivially_copyable_v<Data>
TURBOQ_FORCE_INLINE bool fetch(Consumer& consumer, Data& data) {
template <typename ConsumerT, typename DataT>
requires Consumer<ConsumerT> and std::is_trivially_copyable_v<DataT>
TURBOQ_FORCE_INLINE bool fetch(ConsumerT& consumer, DataT& data) {
auto buffer = consumer.fetch();
if (buffer.empty()) {
return false;
}
data = *std::bit_cast<Data const*>(buffer.data());
data = *std::bit_cast<DataT const*>(buffer.data());
return true;
}

Expand Down
16 changes: 16 additions & 0 deletions deps/boost_outcome/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
include(FetchContent)
FetchContent_Declare(Boost
URL https://github.com/boostorg/boost/releases/download/boost-1.86.0/boost-1.86.0-cmake.tar.xz
EXCLUDE_FROM_ALL
DOWNLOAD_EXTRACT_TIMESTAMP ON
)
FetchContent_MakeAvailable(Boost)

if (NOT TARGET Boost::outcome)
if (NOT TARGET Boost::boost)
message(FATAL_ERROR "target Boost::outcome not found")
endif()
add_library(boost_outcome INTERFACE)
target_link_libraries(boost_outcome INTERFACE Boost::boost)
add_library(Boost::outcome ALIAS boost_outcome)
endif()
16 changes: 16 additions & 0 deletions deps/boost_scope_exit/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
include(FetchContent)
FetchContent_Declare(Boost
URL https://github.com/boostorg/boost/releases/download/boost-1.86.0/boost-1.86.0-cmake.tar.xz
EXCLUDE_FROM_ALL
DOWNLOAD_EXTRACT_TIMESTAMP ON
)
FetchContent_MakeAvailable(Boost)

if (NOT TARGET Boost::scope_exit)
if (NOT TARGET Boost::boost)
message(FATAL_ERROR "target Boost::scope_exit not found")
endif()
add_library(boost_scope_exit INTERFACE)
target_link_libraries(boost_scope_exit INTERFACE Boost::boost)
add_library(Boost::scope_exit ALIAS boost_scope_exit)
endif()
9 changes: 9 additions & 0 deletions deps/fmt/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# does find_package(fmt) exist?

include(FetchContent)
FetchContent_Declare(fmt
URL https://github.com/fmtlib/fmt/archive/refs/tags/10.2.1.tar.gz
EXCLUDE_FROM_ALL
DOWNLOAD_EXTRACT_TIMESTAMP ON
)
FetchContent_MakeAvailable(fmt)

0 comments on commit 2398ff4

Please sign in to comment.