diff --git a/README.md b/README.md index ce0b5ab5b..ee47366cb 100644 --- a/README.md +++ b/README.md @@ -3,24 +3,23 @@ The Reactive Extensions for Native (__RxCpp__) is a library for composing asynch Windows: [![Windows Status](http://img.shields.io/appveyor/ci/kirkshoop/RxCpp-446.svg?style=flat-square)](https://ci.appveyor.com/project/kirkshoop/rxcpp-446) Linux & OSX: [![Linux & Osx Status](http://img.shields.io/travis/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://travis-ci.org/Reactive-Extensions/RxCpp) +[![GitHub license](https://img.shields.io/github/license/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp) + +[![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://gitter.im/Reactive-Extensions/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) +[![doxygen documentation](https://img.shields.io/badge/documentation-latest-brightgreen.svg?style=flat-square)](http://reactive-extensions.github.io/RxCpp) + Github: [![GitHub release](https://img.shields.io/github/release/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp/releases) [![GitHub commits](https://img.shields.io/github/commits-since/Reactive-Extensions/RxCpp/v2.2.0.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp) NuGet: [![NuGet version](http://img.shields.io/nuget/v/RxCpp.svg?style=flat-square)](http://www.nuget.org/packages/RxCpp/) [![NuGet downloads](http://img.shields.io/nuget/dt/RxCpp.svg?style=flat-square)](http://www.nuget.org/packages/RxCpp/) -[![GitHub license](https://img.shields.io/github/license/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp) - -[![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://gitter.im/Reactive-Extensions/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) -[![doxygen documentation](https://img.shields.io/badge/documentation-latest-brightgreen.svg?style=flat-square)](http://reactive-extensions.github.io/RxCpp) - #Example Add ```Rx/v2/src``` to the include paths [![lines from bytes](https://img.shields.io/badge/blog%20post-lines%20from%20bytes-blue.svg?style=flat-square)](http://kirkshoop.github.io/async/rxcpp/c++/2015/07/07/rxcpp_-_parsing_bytes_to_lines_of_text.html) ```cpp - #include "rxcpp/rx.hpp" using namespace rxcpp; using namespace rxcpp::sources; @@ -31,41 +30,42 @@ using namespace rxcpp::util; #include using namespace std; -//using rxcpp::operators::sum; - int main() { random_device rd; // non-deterministic generator mt19937 gen(rd()); uniform_int_distribution<> dist(4, 18); - // produce byte stream that contains lines of text + // for testing purposes, produce byte stream that from lines of text auto bytes = range(1, 10) | - flat_map([&](int i){ - return from( - from((uint8_t)('A' + i)) | - repeat(dist(gen)), - from((uint8_t)'\r')) | - concat(); + flat_map([&](int i){ + auto body = from((uint8_t)('A' + i)) | + repeat(dist(gen)) | + as_dynamic(); + auto delim = from((uint8_t)'\r'); + return from(body, delim) | concat(); }) | window(17) | - flat_map([](observable w){ + flat_map([](observable w){ return w | reduce( - vector(), + vector(), [](vector& v, uint8_t b){ - v.push_back(b); + v.push_back(b); return move(v); - }, - [](vector& v){return move(v);}) | - as_dynamic(); + }) | + as_dynamic(); }) | - filter([](vector& v){ + tap([](vector& v){ + // print input packet of bytes copy(v.begin(), v.end(), ostream_iterator(cout, " ")); - cout << endl; - return true; + cout << endl; }); + // + // recover lines of text from byte stream + // + // create strings split on \r auto strings = bytes | concat_map([](vector v){ @@ -75,6 +75,9 @@ int main() sregex_token_iterator end; vector splits(cursor, end); return iterate(move(splits)); + }) | + filter([](string& s){ + return !s.empty(); }); // group strings by line @@ -83,18 +86,17 @@ int main() group_by( [=](string& s) mutable { return s.back() == '\r' ? group++ : group; - }, - [](string& s) { return move(s);}); + }); // reduce the strings for a line into one string auto lines = linewindows | - flat_map([](grouped_observable w){ - return w | sum(); + flat_map([](grouped_observable w){ + return w | sum(); }); // print result lines | - subscribe(println(cout)); + subscribe(println(cout)); return 0; } diff --git a/Rx/v2/examples/linesfrombytes/main.cpp b/Rx/v2/examples/linesfrombytes/main.cpp index 30639a32b..b027d1393 100644 --- a/Rx/v2/examples/linesfrombytes/main.cpp +++ b/Rx/v2/examples/linesfrombytes/main.cpp @@ -2,6 +2,7 @@ #include "rxcpp/rx.hpp" using namespace rxcpp; using namespace rxcpp::sources; +using namespace rxcpp::operators; using namespace rxcpp::util; #include @@ -14,64 +15,67 @@ int main() mt19937 gen(rd()); uniform_int_distribution<> dist(4, 18); - // produce byte stream that contains lines of text - auto bytes = range(1, 10). - map([&](int i){ - return from((uint8_t)('A' + i)). - repeat(dist(gen)). - concat(from((uint8_t)'\r')); - }). - merge(). - window(17). - map([](observable w){ - return w. + // for testing purposes, produce byte stream that from lines of text + auto bytes = range(1, 10) | + flat_map([&](int i){ + auto body = from((uint8_t)('A' + i)) | + repeat(dist(gen)) | + as_dynamic(); + auto delim = from((uint8_t)'\r'); + return from(body, delim) | concat(); + }) | + window(17) | + flat_map([](observable w){ + return w | reduce( - vector(), + vector(), [](vector& v, uint8_t b){ - v.push_back(b); + v.push_back(b); return move(v); - }, - [](vector& v){return move(v);}). - as_dynamic(); - }). - merge(). - filter([](vector& v){ + }) | + as_dynamic(); + }) | + tap([](vector& v){ + // print input packet of bytes copy(v.begin(), v.end(), ostream_iterator(cout, " ")); - cout << endl; - return true; + cout << endl; }); + // + // recover lines of text from byte stream + // + // create strings split on \r - auto strings = bytes. - map([](vector v){ + auto strings = bytes | + concat_map([](vector v){ string s(v.begin(), v.end()); regex delim(R"/(\r)/"); - sregex_token_iterator cursor(s.begin(), s.end(), delim, {-1, 0}); - sregex_token_iterator end; + cregex_token_iterator cursor(&s[0], &s[0] + s.size(), delim, {-1, 0}); + cregex_token_iterator end; vector splits(cursor, end); return iterate(move(splits)); - }). - concat(); + }) | + filter([](string& s){ + return !s.empty(); + }); // group strings by line int group = 0; - auto linewindows = strings. + auto linewindows = strings | group_by( [=](string& s) mutable { return s.back() == '\r' ? group++ : group; - }, - [](string& s) { return move(s);}); + }); // reduce the strings for a line into one string - auto lines = linewindows. - map([](grouped_observable w){ - return w.sum(); - }). - merge(); + auto lines = linewindows | + flat_map([](grouped_observable w) { + return w | sum(); + }); // print result - lines. - subscribe(println(cout)); + lines | + subscribe(println(cout)); return 0; } diff --git a/Rx/v2/src/rxcpp/operators/rx-concat.hpp b/Rx/v2/src/rxcpp/operators/rx-concat.hpp index a92b8b316..f6d637ef0 100644 --- a/Rx/v2/src/rxcpp/operators/rx-concat.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-concat.hpp @@ -203,12 +203,31 @@ class concat_factory } -template +inline auto concat() + -> detail::concat_factory { + return detail::concat_factory(identity_current_thread()); +} + +template::value>::type> auto concat(Coordination&& sf) -> detail::concat_factory { return detail::concat_factory(std::forward(sf)); } +template::value>::type> +auto concat(O0&& o0, ON&&... on) + -> detail::concat_factory { + return detail::concat_factory(identity_current_thread())(from(std::forward(o0), std::forward(on)...)); +} + +template::value>::type, + class CheckO = typename std::enable_if::value>::type> +auto concat(Coordination&& sf, O0&& o0, ON&&... on) + -> detail::concat_factory { + return detail::concat_factory(std::forward(sf))(from(std::forward(o0), std::forward(on)...)); +} + } } diff --git a/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp index 49f50af7e..d7b32c8dd 100644 --- a/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp @@ -45,7 +45,7 @@ struct concat_traits { static_assert(!std::is_same(0)), tag_not_valid>::value, "concat_map ResultSelector must be a function with the signature concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type)"); - typedef decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr)) value_type; + typedef rxu::decay_t value_type; }; template @@ -267,6 +267,19 @@ auto concat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf) return detail::concat_map_factory(std::forward(s), std::forward(rs), std::forward(sf)); } +template::value>::type> +auto concat_map(CollectionSelector&& s, Coordination&& sf) + -> detail::concat_map_factory, Coordination> { + return detail::concat_map_factory, Coordination>(std::forward(s), rxu::take_at<1>(), std::forward(sf)); +} + +template +auto concat_map(CollectionSelector&& s) + -> detail::concat_map_factory, identity_one_worker> { + return detail::concat_map_factory, identity_one_worker>(std::forward(s), rxu::take_at<1>(), identity_current_thread()); +} + + } } diff --git a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp index dcc3521cd..7f5aae1f0 100644 --- a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp @@ -30,7 +30,7 @@ struct flat_map_traits { static_assert(!std::is_same(0)), tag_not_valid>::value, "flat_map CollectionSelector must be a function with the signature observable(flat_map::source_value_type)"); - typedef decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr))) collection_type; + typedef rxu::decay_t collection_type; static_assert(is_observable::value, "flat_map CollectionSelector must return an observable"); @@ -43,7 +43,7 @@ struct flat_map_traits { static_assert(!std::is_same(0)), tag_not_valid>::value, "flat_map ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type)"); - typedef decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr)) value_type; + typedef rxu::decay_t value_type; }; template @@ -234,8 +234,20 @@ auto flat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf) return detail::flat_map_factory(std::forward(s), std::forward(rs), std::forward(sf)); } +template::value>::type> +auto flat_map(CollectionSelector&& s, Coordination&& sf) + -> detail::flat_map_factory, Coordination> { + return detail::flat_map_factory, Coordination>(std::forward(s), rxu::take_at<1>(), std::forward(sf)); } +template +auto flat_map(CollectionSelector&& s) + -> detail::flat_map_factory, identity_one_worker> { + return detail::flat_map_factory, identity_one_worker>(std::forward(s), rxu::take_at<1>(), identity_current_thread()); } -#endif \ No newline at end of file +} + +} + +#endif diff --git a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp index 203f3c974..6da0af021 100644 --- a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp @@ -190,7 +190,7 @@ class group_by_factory template struct group_by_factory_traits { - typedef rxu::value_type_t value_type; + typedef rxu::value_type_t> value_type; typedef detail::group_by_traits traits_type; typedef detail::group_by group_by_type; }; @@ -209,6 +209,18 @@ inline auto group_by(KeySelector ks, MarbleSelector ms, BinaryPredicate p) return detail::group_by_factory(std::move(ks), std::move(ms), std::move(p)); } +template +inline auto group_by(KeySelector ks, MarbleSelector ms) + -> detail::group_by_factory { + return detail::group_by_factory(std::move(ks), std::move(ms), rxu::less()); +} + +template +inline auto group_by(KeySelector ks) + -> detail::group_by_factory, rxu::less> { + return detail::group_by_factory, rxu::less>(std::move(ks), rxu::take_at<0>(), rxu::less()); +} + } diff --git a/Rx/v2/src/rxcpp/operators/rx-merge.hpp b/Rx/v2/src/rxcpp/operators/rx-merge.hpp index 17409d3db..c46e752f8 100644 --- a/Rx/v2/src/rxcpp/operators/rx-merge.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-merge.hpp @@ -182,6 +182,11 @@ class merge_factory } +inline auto merge() + -> detail::merge_factory { + return detail::merge_factory(identity_current_thread()); +} + template auto merge(Coordination&& sf) -> detail::merge_factory { diff --git a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp index 144c22542..c3e234fa5 100644 --- a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp @@ -51,7 +51,7 @@ struct is_result_function_for { template static tag_not_valid check(...); - typedef decltype(check(0)) type; + typedef rxu::decay_t(0))> type; static const bool value = !std::is_same::value; }; @@ -69,7 +69,7 @@ struct reduce_traits static_assert(is_result_function_for::value, "reduce ResultSelector must be a function with the signature reduce::value_type(Seed)"); - typedef typename is_result_function_for::type value_type; + typedef rxu::decay_t::type> value_type; }; template @@ -189,6 +189,22 @@ class reduce_factory } }; +template class Factory> +class delay_reduce_factory +{ + template using accumulator_t = Factory>; + template using result_selector_t = Factory>; + template using seed_t = typename Factory>::seed_type; + template using result_value_t = decltype(result_selector_t()(*(seed_t*)nullptr)); +public: + template + auto operator()(const Observable& source) + -> observable, reduce, typename Observable::source_operator_type, accumulator_t, result_selector_t, seed_t>> { + return observable, reduce, typename Observable::source_operator_type, accumulator_t, result_selector_t, seed_t>>( + reduce, typename Observable::source_operator_type, accumulator_t, result_selector_t, seed_t>(source.source_operator, accumulator_t(), result_selector_t(), accumulator_t().seed())); + } +}; + template struct initialize_seeder { typedef T seed_type; @@ -213,7 +229,7 @@ struct average { seed_type seed() { return seed_type{}; } - seed_type operator()(seed_type a, T v) { + seed_type operator()(seed_type& a, T v) { if (a.count != 0 && (a.count == std::numeric_limits::max() || ((v > 0) && (a.value > (std::numeric_limits::max() - v))) || @@ -235,7 +251,7 @@ struct average { } return a; } - double operator()(seed_type a) { + double operator()(seed_type& a) { if (a.count > 0) { double avg = a.value / a.count; if (!a.stage.empty()) { @@ -253,14 +269,14 @@ struct sum { seed_type seed() { return seed_type(); } - seed_type operator()(seed_type a, T v) { + seed_type operator()(seed_type& a, T v) { if (a.empty()) a.reset(v); else *a = *a + v; return a; } - T operator()(seed_type a) { + T operator()(seed_type& a) { if (a.empty()) throw rxcpp::empty_error("sum() requires a stream with at least one value"); return *a; @@ -273,14 +289,14 @@ struct max { seed_type seed() { return seed_type(); } - seed_type operator()(seed_type a, T v) { + seed_type operator()(seed_type& a, T v) { if (a.empty()) a.reset(v); else *a = (v < *a ? *a : v); return a; } - T operator()(seed_type a) { + T operator()(seed_type& a) { if (a.empty()) throw rxcpp::empty_error("max() requires a stream with at least one value"); return *a; @@ -293,14 +309,14 @@ struct min { seed_type seed() { return seed_type(); } - seed_type operator()(seed_type a, T v) { + seed_type operator()(seed_type& a, T v) { if (a.empty()) a.reset(v); else *a = (*a < v ? *a : v); return a; } - T operator()(seed_type a) { + T operator()(seed_type& a) { if (a.empty()) throw rxcpp::empty_error("min() requires a stream with at least one value"); return *a; @@ -315,6 +331,37 @@ auto reduce(Seed s, Accumulator a, ResultSelector rs) return detail::reduce_factory(std::move(a), std::move(rs), std::move(s)); } +template +auto reduce(Seed s, Accumulator a) + -> detail::reduce_factory, Seed> { + return detail::reduce_factory, Seed>(std::move(a), rxu::take_at<0>(), std::move(s)); +} + +inline auto count() + -> detail::reduce_factory, int> { + return detail::reduce_factory, int>(rxu::count(), rxu::take_at<0>(), 0); +} + +inline auto average() + -> detail::delay_reduce_factory { + return detail::delay_reduce_factory(); +} + +inline auto sum() + -> detail::delay_reduce_factory { + return detail::delay_reduce_factory(); +} + +inline auto min() + -> detail::delay_reduce_factory { + return detail::delay_reduce_factory(); +} + +inline auto max() + -> detail::delay_reduce_factory { + return detail::delay_reduce_factory(); +} + } } diff --git a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp index 438f4610f..90de9a0b3 100644 --- a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp @@ -105,9 +105,9 @@ class repeat_factory template auto operator()(Observable&& source) - -> observable>, repeat>, Observable, count_type>> { - return observable>, repeat>, Observable, count_type>>( - repeat>, Observable, count_type>(std::forward(source), count)); + -> observable>, repeat>, rxu::decay_t, count_type>> { + return observable>, repeat>, rxu::decay_t, count_type>>( + repeat>, rxu::decay_t, count_type>(std::forward(source), count)); } }; @@ -115,7 +115,7 @@ class repeat_factory template auto repeat(T&& t) --> detail::repeat_factory { + -> detail::repeat_factory { return detail::repeat_factory(std::forward(t)); } diff --git a/Rx/v2/src/rxcpp/rx-grouped_observable.hpp b/Rx/v2/src/rxcpp/rx-grouped_observable.hpp index b532d6e4d..031bc5393 100644 --- a/Rx/v2/src/rxcpp/rx-grouped_observable.hpp +++ b/Rx/v2/src/rxcpp/rx-grouped_observable.hpp @@ -174,8 +174,8 @@ class grouped_observable // support range() >> filter() >> subscribe() syntax // '>>' is spelled 'stream' // -template -auto operator >> (const rxcpp::grouped_observable& source, OperatorFactory&& of) +template +auto operator >> (const rxcpp::grouped_observable& source, OperatorFactory&& of) -> decltype(source.op(std::forward(of))) { return source.op(std::forward(of)); } @@ -184,8 +184,8 @@ auto operator >> (const rxcpp::grouped_observable& source, Op // support range() | filter() | subscribe() syntax // '|' is spelled 'pipe' // -template -auto operator | (const rxcpp::grouped_observable& source, OperatorFactory&& of) +template +auto operator | (const rxcpp::grouped_observable& source, OperatorFactory&& of) -> decltype(source.op(std::forward(of))) { return source.op(std::forward(of)); } diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp index a4fb758bd..dcdc99dc2 100644 --- a/Rx/v2/src/rxcpp/rx-util.hpp +++ b/Rx/v2/src/rxcpp/rx-util.hpp @@ -214,6 +214,31 @@ inline auto pack() return detail::pack(); } +namespace detail { + +template +struct take_at +{ + template + auto operator()(ParamN... pn) + -> decay_t(std::make_tuple(std::move(pn)...)))> { + return std::get(std::make_tuple(std::move(pn)...)); + } + template + auto operator()(ParamN... pn) const + -> decay_t(std::make_tuple(std::move(pn)...)))> { + return std::get(std::make_tuple(std::move(pn)...)); + } +}; + +} + +template +inline auto take_at() + -> detail::take_at { + return detail::take_at(); +} + template struct resolve_type;