Skip to content

Commit

Permalink
some code for python
Browse files Browse the repository at this point in the history
  • Loading branch information
ksergey committed Oct 4, 2024
1 parent 56ab859 commit 07de427
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 24 deletions.
17 changes: 8 additions & 9 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ option(TURBOQ_SANITIZER "Build with sanitizer" OFF)

include(cmake/TurboQHelpers.cmake)

if(NOT TARGET fmt::fmt)
if (NOT TARGET fmt::fmt-header-only)
add_subdirectory(deps/fmt)
endif()
if(NOT TARGET Boost::outcome)
if (NOT TARGET Boost::outcome)
add_subdirectory(deps/boost_outcome)
endif()
if(NOT TARGET Boost::scope_exit)
if (NOT TARGET Boost::scope_exit)
add_subdirectory(deps/boost_scope_exit)
endif()
if(NOT TARGET doctest::doctest_with_main)
if (NOT TARGET doctest::doctest_with_main)
add_subdirectory(deps/doctest)
endif()
if(NOT TARGET benchmark::benchmark_main)
if (NOT TARGET benchmark::benchmark_main)
add_subdirectory(deps/benchmark)
endif()
if(NOT TARGET cxxopts::cxxopts)
if (NOT TARGET cxxopts::cxxopts)
add_subdirectory(deps/cxxopts)
endif()

Expand All @@ -31,10 +31,9 @@ enable_testing()
add_subdirectory(code)
add_subdirectory(examples)

if(TURBOQ_PYTHON)
if(NOT COMMAND nanobind_add_module)
if (TURBOQ_PYTHON)
if (NOT COMMAND nanobind_add_module)
add_subdirectory(deps/nanobind)
endif()

add_subdirectory(python)
endif()
7 changes: 6 additions & 1 deletion code/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ target_compile_options(${TargetName}
target_include_directories(${TargetName}
PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(${TargetName}
PUBLIC fmt::fmt Boost::outcome Boost::scope_exit)
PUBLIC fmt::fmt-header-only Boost::outcome Boost::scope_exit)

if (TURBOQ_PYTHON)
set_property(TARGET ${TargetName}
PROPERTY POSITION_INDEPENDENT_CODE ON)
endif()

if (TURBOQ_SANITIZER)
target_compile_options(${TargetName}
Expand Down
13 changes: 7 additions & 6 deletions deps/boost_outcome/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
if (TARGET Boost::boost)
add_library(boost_outcome INTERFACE)
target_link_libraries(boost_outcome INTERFACE Boost::boost)
add_library(Boost::outcome ALIAS boost_outcome)
endif()

include(FetchContent)
FetchContent_Declare(Boost
URL https://github.com/boostorg/boost/releases/download/boost-1.86.0/boost-1.86.0-cmake.tar.xz
Expand All @@ -7,10 +13,5 @@ FetchContent_Declare(Boost
FetchContent_MakeAvailable(Boost)

if (NOT TARGET Boost::outcome)
if (NOT TARGET Boost::boost)
message(FATAL_ERROR "target Boost::outcome not found")
endif()
add_library(boost_outcome INTERFACE)
target_link_libraries(boost_outcome INTERFACE Boost::boost)
add_library(Boost::outcome ALIAS boost_outcome)
message(FATAL_ERROR "target Boost::outcome not found")
endif()
13 changes: 13 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
set(TargetName spsc_pub)

add_executable(${TargetName} ${TargetName}.cpp)
target_compile_features(${TargetName}
PUBLIC cxx_std_20)
set_target_properties(${TargetName}
PROPERTIES
CXX_STANDARD_REQUIRED ON
CXX_EXTENSIONS OFF)
target_compile_options(${TargetName}
PUBLIC -Wall -Wextra -Wattributes -Wpedantic -Wstrict-aliasing -Wcast-align -g)
target_link_libraries(${TargetName}
PUBLIC fmt::fmt turboq::turboq)
36 changes: 36 additions & 0 deletions examples/spsc_pub.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#include <iostream>
#include <string>

#include <fmt/format.h>

#include <turboq/BoundedSPSCRawQueue.h>
#include <turboq/utils.h>

int main(int argc, char* argv[]) {
try {
char const* queueName = "turboq.spsc";
if (argc > 1) {
queueName = argv[1];
}

auto producer = turboq::BoundedSPSCRawQueue(queueName, {5 * 1024 * 1024}).createProducer();

for (std::string line; std::getline(std::cin, line);) {
if (line.empty()) {
continue;
}

auto result = producer.prepare(line.size());
if (result.empty()) {
throw std::runtime_error("failed to prepare buffer to send");
}
std::memcpy(result.data(), line.data(), line.size());
producer.commit();
}

} catch (std::exception const& e) {
fmt::print(stderr, "ERROR: {}\n", e.what());
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
2 changes: 2 additions & 0 deletions python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ nanobind_add_module(${TargetName}

target_link_libraries(${TargetName}
PRIVATE turboq::turboq)

file(COPY dump.py DESTINATION ${CMAKE_CURRENT_BINARY_DIR})
8 changes: 8 additions & 0 deletions python/dump.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import pyturboq

consumer = pyturboq.create_consumer('turboq.spsc')

while True:
str = consumer.dequeue()
if len(str) > 0:
print(str)
52 changes: 44 additions & 8 deletions python/turboq.cpp
Original file line number Diff line number Diff line change
@@ -1,19 +1,55 @@
// Copyright (c) Sergey Kovalevich <inndie@gmail.com>
// SPDX-License-Identifier: AGPL-3.0

#include <string>
#include <string_view>

#include <nanobind/nanobind.h>
#include <nanobind/stl/string.h>
#include <nanobind/stl/string_view.h>
#include <nanobind/stl/vector.h>

#include <turboq/BoundedMPSCRawQueue.h>
#include <turboq/BoundedSPMCRawQueue.h>
#include <turboq/BoundedSPSCRawQueue.h>

NAMESPACE_BEGIN(NB_NAMESPACE)
NAMESPACE_BEGIN(detail)

template <class T, std::size_t Extent>
struct type_caster<std::span<T, Extent>> : list_caster<std::span<T, Extent>, T> {};

NAMESPACE_END(detail)
NAMESPACE_END(NB_NAMESPACE)

using namespace nanobind::literals;

NB_MODULE(pyturboq, m) {
m.def(
"sample",
[](int a, int b = 1) {
return a + b;
},
"a"_a, "b"_a = 1, "sample text");
#if 0
nanobind::class_<turboq::MappedRegion>(m, "MappedRegion");
#endif

nanobind::class_<turboq::BoundedSPSCRawQueue::Producer>(m, "BoundedSPSCRawQueueProducer")
.def("enqueue", [](turboq::BoundedSPSCRawQueue::Producer& self, std::string_view s) -> bool {
auto result = self.prepare(s.size());
if (result.empty()) {
return false;
}
std::memcpy(result.data(), s.data(), s.size());
self.commit();
return true;
});

nanobind::class_<turboq::BoundedSPSCRawQueue::Consumer>(m, "BoundedSPSCRawQueueConsumer")
.def("dequeue", [](turboq::BoundedSPSCRawQueue::Consumer& self) -> std::string {
auto result = self.fetch();
if (!result.empty()) {
std::string data(std::bit_cast<char const*>(result.data()), result.size());
self.consume();
return data;
}
return std::string();
});

m.def("create_consumer", [](std::string_view name) -> turboq::BoundedSPSCRawQueue::Consumer {
return turboq::BoundedSPSCRawQueue(name).createConsumer();
});
}

0 comments on commit 07de427

Please sign in to comment.