SSX3LobbyServer/lib/base/channel.hpp

121 lines
3.7 KiB
C++
Raw Permalink Normal View History

//! a wrapper over Boost.Asio-provided channel facilities to give it a slightly saner API
//! that we would probably prefer instead of having to manually dick with completion handlers
//! and all that. this is a alternative to asiochan because it seems to be iffy and possibly
//! broken when dealing with multiple threads, causing empty coroutine frames to be generated
//! which cause crashes very quickly. (that actually wasn't because of asiochan, but,
//! using the Asio provided facilities has seemed better anyhow)
#pragma once
#include <base/assert.hpp>
#include <base/types.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
namespace base {
// n.b: we only support one signature here. the asio one supports Several but that seems
// unneeded for our use case (and we can use variants to encode any states in a far less.. jagged,
// shall we say, fashion)
//
// also, concurrent_channel is used here because we can be used in multithreaded contexts, often
// for synchronization/message passing between threads in a safe fashion (without having to post back/forth executors)
template <class Sig>
using ChannelImplType = asio::experimental::concurrent_channel<Sig>;
template <class Send>
struct Channel {
Channel(asio::any_io_executor exec, usize sendQueueLen = 0) : exec(exec), channel(exec, sendQueueLen) {}
Awaitable<void> Write(const Send& value) { co_await channel.async_send(bsys::error_code {}, value, asio::deferred); }
Awaitable<Send> Read() {
// BASE_ASSERT(channel.is_open() == true, "cant really do that with a closed channel now can you");
co_return co_await channel.async_receive(asio::deferred);
}
bool IsOpen() const { return channel.is_open(); }
Awaitable<void> Close() {
channel.close();
co_return;
}
/// get the raw ASIO channel type. Used in the worker thread pool.
auto& Raw() { return channel; }
private:
asio::any_io_executor exec;
ChannelImplType<void(bsys::error_code, Send)> channel;
};
template <>
struct Channel<void> {
Channel(asio::any_io_executor exec) : exec(exec), channel(exec) {}
Awaitable<void> Write() { co_await channel.async_send(bsys::error_code {}, asio::deferred); }
Awaitable<void> Read() {
// BASE_ASSERT(channel.is_open() == true, "cant really do that with a closed channel now can you");
co_await channel.async_receive(asio::deferred);
co_return;
}
bool IsOpen() const { return channel.is_open(); }
Awaitable<void> Close() {
channel.close();
co_return;
}
auto& Raw() { return channel; }
private:
asio::any_io_executor exec;
ChannelImplType<void(bsys::error_code)> channel;
};
// Channel adapters to make producer/consumer logic *way* more typesafe.
template <class T>
struct ReadChannel {
// N.B: This is not `explicit` by design, to allow implicitly "downgrading"
// a full duplex channel into a read-only channel (or write-only in the case of
// WriteChannel)
ReadChannel(Channel<T>& chan) : chan(chan) {}
bool IsOpen() const { return chan.IsOpen(); }
Awaitable<T> Read() { return chan.Read(); }
private:
Channel<T>& chan;
};
template <class T>
struct WriteChannel {
WriteChannel(Channel<T>& chan) : chan(chan) {}
bool IsOpen() const { return chan.IsOpen(); }
Awaitable<void> Write(const T& val) { co_await chan.Write(val); }
private:
Channel<T>& chan;
};
// A little bit depressing that this can't be sfinae'd away
// (or `requires`'d away), but oh well
template <>
struct WriteChannel<void> {
WriteChannel(Channel<void>& chan) : chan(chan) {}
bool IsOpen() const { return chan.IsOpen(); }
Awaitable<void> Write() { co_await chan.Write(); }
private:
Channel<void>& chan;
};
} // namespace base