//! a wrapper over Boost.Asio-provided channel facilities to give it a slightly saner API
//! that we would probably prefer instead of having to manually dick with completion handlers
//! and all that. this is a alternative to asiochan because it seems to be iffy and possibly
//! broken when dealing with multiple threads, causing empty coroutine frames to be generated
//! which cause crashes very quickly. (that actually wasn't because of asiochan, but,
//! using the Asio provided facilities has seemed better anyhow)
#pragma once
#include
#include
#include
namespace base {
// n.b: we only support one signature here. the asio one supports Several but that seems
// unneeded for our use case (and we can use variants to encode any states in a far less.. jagged,
// shall we say, fashion)
//
// also, concurrent_channel is used here because we can be used in multithreaded contexts, often
// for synchronization/message passing between threads in a safe fashion (without having to post back/forth executors)
template
using ChannelImplType = asio::experimental::concurrent_channel;
template
struct Channel {
Channel(asio::any_io_executor exec, usize sendQueueLen = 0) : exec(exec), channel(exec, sendQueueLen) {}
Awaitable Write(const Send& value) { co_await channel.async_send(bsys::error_code {}, value, asio::deferred); }
Awaitable Read() {
// BASE_ASSERT(channel.is_open() == true, "cant really do that with a closed channel now can you");
co_return co_await channel.async_receive(asio::deferred);
}
bool IsOpen() const { return channel.is_open(); }
Awaitable Close() {
channel.close();
co_return;
}
/// get the raw ASIO channel type. Used in the worker thread pool.
auto& Raw() { return channel; }
private:
asio::any_io_executor exec;
ChannelImplType channel;
};
template <>
struct Channel {
Channel(asio::any_io_executor exec) : exec(exec), channel(exec) {}
Awaitable Write() { co_await channel.async_send(bsys::error_code {}, asio::deferred); }
Awaitable Read() {
// BASE_ASSERT(channel.is_open() == true, "cant really do that with a closed channel now can you");
co_await channel.async_receive(asio::deferred);
co_return;
}
bool IsOpen() const { return channel.is_open(); }
Awaitable Close() {
channel.close();
co_return;
}
auto& Raw() { return channel; }
private:
asio::any_io_executor exec;
ChannelImplType channel;
};
// Channel adapters to make producer/consumer logic *way* more typesafe.
template
struct ReadChannel {
// N.B: This is not `explicit` by design, to allow implicitly "downgrading"
// a full duplex channel into a read-only channel (or write-only in the case of
// WriteChannel)
ReadChannel(Channel& chan) : chan(chan) {}
bool IsOpen() const { return chan.IsOpen(); }
Awaitable Read() { return chan.Read(); }
private:
Channel& chan;
};
template
struct WriteChannel {
WriteChannel(Channel& chan) : chan(chan) {}
bool IsOpen() const { return chan.IsOpen(); }
Awaitable Write(const T& val) { co_await chan.Write(val); }
private:
Channel& chan;
};
// A little bit depressing that this can't be sfinae'd away
// (or `requires`'d away), but oh well
template <>
struct WriteChannel {
WriteChannel(Channel& chan) : chan(chan) {}
bool IsOpen() const { return chan.IsOpen(); }
Awaitable Write() { co_await chan.Write(); }
private:
Channel& chan;
};
} // namespace base