*: Giant rework

We now have DirtySockClient and DirtySockServer classes which do the actual protocol server work. The server also allow setting the message fourccs that can be accepted, which in theory we can implement derived classes which set that up.. For now we don't do that and run one dummy lobby server

The message factory now handles parsing, and messages now contain the header (mostly for niceity, but in theory we can now bound stuff or smth idk..)

I moved the IMessage code to the root, and I may even move the MessageFactory class to another file if I can.
This commit is contained in:
Lily Tsuru 2024-03-10 05:55:40 -04:00
parent 4851371f56
commit 37f21d8167
14 changed files with 417 additions and 67 deletions

View File

@ -5,7 +5,7 @@ if(WIN32 OR APPLE OR BSD)
endif()
project(SSX3LobbyServer
project(SSX3LobbyServerServer
LANGUAGES CXX
)

View File

@ -1,21 +1,21 @@
#pragma once
#include <base/assert.hpp>
#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/as_tuple.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/basic_waitable_timer.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/generic/stream_protocol.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/local/stream_protocol.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/beast/core/basic_stream.hpp>
#include <boost/beast/core/error.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
namespace asio = boost::asio;
namespace beast = boost::beast;
@ -40,4 +40,18 @@ namespace base {
template <typename Protocol>
using BeastStream = beast::basic_stream<Protocol, ExecutorType>;
/// Exception boilerplate
inline auto DefCoroCompletion(std::string_view name) {
// N.B: name is expected to be a literal
return [name](auto ep) {
if(ep) {
try {
std::rethrow_exception(ep);
} catch(std::exception& e) {
BASE_CHECK(false, "Unhandled exception in task \"{}\": {}", name, e.what());
}
}
};
}
} // namespace base

View File

@ -1,8 +1,14 @@
add_executable(lobbyserver
main.cpp
Server.cpp
DirtySockClient.cpp
DirtySockServer.cpp
IMessage.cpp
# message implementations
messages/IMessage.cpp
messages/PingMessage.cpp
)

82
src/DirtySockClient.cpp Normal file
View File

@ -0,0 +1,82 @@
#include "DirtySockClient.hpp"
#include <boost/asio/read.hpp>
#include <boost/asio/write.hpp>
#include "DirtySockServer.hpp"
namespace ls {
DirtySockClient::DirtySockClient(Stream stream, base::Ref<DirtySockServer> server)
: stream(std::move(stream)), server(server) {
}
void DirtySockClient::Close() {
stream.close();
}
base::Ref<DirtySockServer> DirtySockClient::GetServer() {
return server;
}
base::Awaitable<DirtySockClient::MessagePtr> DirtySockClient::ReadMessage() {
proto::WireMessageHeader header;
std::vector<u8> propertyBuffer;
try {
co_await asio::async_read(stream, asio::buffer(&header, sizeof(header)), asio::deferred);
propertyBuffer.resize(header.payloadSize);
co_await asio::async_read(stream, asio::buffer(propertyBuffer), asio::deferred);
if(!server->allowedMessages.empty()) {
if(!server->allowedMessages.contains(static_cast<base::FourCC32_t>(header.typeCode)))
co_return nullptr;
}
// this function may fail and also return nullptr. Maybe we should instead throw an exception here
// (that we leave to callers to catch)
co_return MessageFactory::CreateAndParseMessage(header, propertyBuffer);
} catch(bsys::system_error& ec) {
if(ec.code() != asio::error::operation_aborted)
base::LogError("Error in DirtySockClient::WriteMessage(): {}", ec.what());
co_return nullptr;
}
}
base::Awaitable<void> DirtySockClient::WriteMessage(ConstMessagePtr message) {
auto buf = std::vector<u8> {};
message->SerializeTo(buf);
try {
co_await asio::async_write(stream, asio::buffer(buf), asio::deferred);
} catch(bsys::system_error& ec) {
if(ec.code() != asio::error::operation_aborted)
base::LogError("Error in DirtySockClient::WriteMessage(): {}", ec.what());
}
}
base::Awaitable<void> DirtySockClient::Run() {
try {
while(true) {
auto message = co_await ReadMessage();
if(message) {
// is permitted to call WriteMessage
co_await message->Process(shared_from_this());
} else {
// This will occur if parsing fails or etc.
base::LogError("Error parsing message, closing connection");
Close();
co_return;
}
}
} catch(bsys::system_error& ec) {
if(ec.code() != asio::error::operation_aborted)
base::LogError("Error in DirtySockClient::Run(): {}", ec.what());
}
}
} // namespace ls

39
src/DirtySockClient.hpp Normal file
View File

@ -0,0 +1,39 @@
#pragma once
#include <base/types.hpp>
#include <deque>
#include <impl/asio_config.hpp>
#include "IMessage.hpp"
namespace ls {
struct DirtySockServer;
struct DirtySockClient : public std::enable_shared_from_this<DirtySockClient> {
using MessagePtr = base::Ref<IMessage>;
using ConstMessagePtr = base::Ref<const IMessage>;
using Protocol = asio::ip::tcp;
using Stream = base::BeastStream<Protocol>;
DirtySockClient(Stream stream, base::Ref<DirtySockServer> server);
void Close();
base::Ref<DirtySockServer> GetServer();
base::Awaitable<void> WriteMessage(ConstMessagePtr message);
private:
friend struct DirtySockServer;
// internal
base::Awaitable<MessagePtr> ReadMessage();
base::Awaitable<void> Run();
Stream stream;
base::Ref<DirtySockServer> server;
};
} // namespace ls

60
src/DirtySockServer.cpp Normal file
View File

@ -0,0 +1,60 @@
#include "DirtySockServer.hpp"
#include "DirtySockClient.hpp"
namespace ls {
DirtySockServer::DirtySockServer(asio::any_io_executor exec)
: exec(exec), acceptor(exec) {
}
void DirtySockServer::Start(const Protocol::endpoint& ep) {
asio::co_spawn(exec, Listener(ep), base::DefCoroCompletion("EaServer listener"));
}
bool DirtySockServer::Listening() const {
return acceptor.is_open();
}
base::Awaitable<void> DirtySockServer::Listener(const Protocol::endpoint& endpoint) {
try {
acceptor.open(endpoint.protocol());
acceptor.set_option(asio::socket_base::reuse_address(true));
// set SO_REUSEPORT using a custom type. This is flaky but we pin boost
// so this will be ok I suppose
using reuse_port = asio::detail::socket_option::boolean<SOL_SOCKET, SO_REUSEPORT>;
acceptor.set_option(reuse_port(true));
acceptor.set_option(asio::ip::tcp::no_delay { true });
acceptor.bind(endpoint);
acceptor.listen(asio::socket_base::max_listen_connections);
logger.Info("DirtySockServer listening on {}:{}", endpoint.address().to_string(), endpoint.port());
while(true) {
auto socket = co_await acceptor.async_accept(asio::deferred);
auto stream = Stream { std::move(socket) };
asio::co_spawn(exec, RunSession(std::move(stream)), base::DefCoroCompletion("DirtySockServer Session"));
}
} catch(bsys::system_error& ec) {
if(ec.code() != asio::error::operation_aborted)
logger.Error("Error in DirtySockServer::Listener(): {}", ec.what());
}
co_return;
}
base::Awaitable<void> DirtySockServer::RunSession(Stream stream) {
auto client = std::make_shared<DirtySockClient>(std::move(stream), shared_from_this());
clientSet.insert(client);
co_await client->Run();
clientSet.erase(client);
}
} // namespace ls

46
src/DirtySockServer.hpp Normal file
View File

@ -0,0 +1,46 @@
#pragma once
#include <base/fourcc.hpp>
#include <base/logger.hpp>
#include <impl/asio_config.hpp>
#include <memory>
#include <set>
namespace ls {
struct DirtySockClient;
struct DirtySockServer : public std::enable_shared_from_this<DirtySockServer> {
using Protocol = asio::ip::tcp;
using Stream = base::BeastStream<Protocol>;
/// alias for thing
using AllowedMessagesSet = std::set<base::FourCC32_t>;
DirtySockServer(asio::any_io_executor exec);
void Start(const Protocol::endpoint& endpoint);
bool Listening() const;
void SetAllowedMessages(const AllowedMessagesSet& allowedMessageSet) { allowedMessages = allowedMessageSet; }
private:
friend struct DirtySockClient;
const AllowedMessagesSet& GetAllowedMessages() const { return allowedMessages; }
base::Awaitable<void> Listener(const Protocol::endpoint& ep);
base::Awaitable<void> RunSession(Stream stream);
asio::any_io_executor exec;
AllowedMessagesSet allowedMessages;
Protocol::acceptor acceptor;
std::set<base::Ref<DirtySockClient>> clientSet;
// i'm moving to spdlog fuck this
base::Logger logger { base::MakeChannelId(base::MessageSource::Server, base::MessageComponentSource::Server_Server) };
};
} // namespace ls

View File

@ -7,6 +7,10 @@
namespace ls {
IMessage::IMessage(const proto::WireMessageHeader& header)
: header(header) {
}
bool IMessage::ParseFromInputBuffer(std::span<const u8> inputBuffer) {
// Nothing to parse,
// which isn't exclusively a failure condition.
@ -138,14 +142,14 @@ namespace ls {
/// Debug message, used to.. well, debug, obviously.
struct DebugMessage : IMessage {
explicit DebugMessage(base::FourCC32_t myTypeCode)
: myTypeCode(myTypeCode) {
explicit DebugMessage(const proto::WireMessageHeader& header)
: IMessage(header) {
}
base::FourCC32_t TypeCode() const override { return myTypeCode; }
base::FourCC32_t TypeCode() const override { return static_cast<base::FourCC32_t>(header.typeCode); }
base::Awaitable<void> Process(base::Ref<ls::Client> client) override {
auto* fccbytes = ((uint8_t*)&myTypeCode);
base::Awaitable<void> Process(base::Ref<ls::DirtySockClient> client) override {
auto* fccbytes = std::bit_cast<u8*>(&header.typeCode);
base::LogInfo("Debug Message FourCC lo: \"{:c}{:c}{:c}{:c}\"", fccbytes[0], fccbytes[1], fccbytes[2], fccbytes[3]);
base::LogInfo("Debug Message Properties:");
@ -154,9 +158,6 @@ namespace ls {
base::LogInfo("{}: {}", key, value);
co_return;
}
private:
base::FourCC32_t myTypeCode {};
};
MessageFactory::FactoryMap& MessageFactory::GetFactoryMap() {
@ -164,12 +165,19 @@ namespace ls {
return factoryMap;
}
base::Ref<IMessage> MessageFactory::CreateMessage(base::FourCC32_t fourCC) {
base::Ref<IMessage> MessageFactory::CreateAndParseMessage(const proto::WireMessageHeader& header, std::span<const u8> propertyDataBuffer) {
const auto& factories = GetFactoryMap();
if(const auto it = factories.find(fourCC); it == factories.end())
return std::make_shared<DebugMessage>(fourCC);
base::Ref<IMessage> ret = nullptr;
if(const auto it = factories.find(static_cast<base::FourCC32_t>(header.typeCode)); it != factories.end())
ret = (it->second)(header);
else
return (it->second)();
ret = std::make_shared<DebugMessage>(header);
if(ret->ParseFromInputBuffer(propertyDataBuffer))
return nullptr;
return ret;
}
} // namespace ls

View File

@ -1,12 +1,17 @@
#pragma once
#include <base/fourcc.hpp>
#include <base/types.hpp>
#include <impl/asio_config.hpp>
#include "WireMessage.hpp"
namespace ls {
struct Server;
struct Client;
struct DirtySockClient;
struct IMessage {
explicit IMessage(const proto::WireMessageHeader& header);
virtual ~IMessage() = default;
/// Parses from input buffer. The data must live until
@ -21,28 +26,30 @@ namespace ls {
virtual base::FourCC32_t TypeCode() const = 0;
/// Process a single message.
virtual base::Awaitable<void> Process(base::Ref<Client> client) = 0;
virtual base::Awaitable<void> Process(base::Ref<DirtySockClient> client) = 0;
const std::optional<std::string_view> MaybeGetKey(const std::string& key) const;
void SetKey(const std::string& key, const std::string& value);
const proto::WireMessageHeader& GetHeader() const { return header; }
protected:
proto::WireMessageHeader header;
/// all properties.
std::unordered_map<std::string, std::string> properties {};
/// The client this message is for.
base::Ref<Client> client {};
};
struct MessageFactory {
static base::Ref<IMessage> CreateMessage(base::FourCC32_t fourCC);
/// Creates and parses the given implementation of IMessage.
static base::Ref<IMessage> CreateAndParseMessage(const proto::WireMessageHeader& header, std::span<const u8> propertyDataBuffer);
private:
template <base::FixedString fourcc, class Impl>
friend struct MessageMixin;
using FactoryMap = std::unordered_map<base::FourCC32_t, base::Ref<IMessage> (*)()>;
using FactoryMap = std::unordered_map<base::FourCC32_t, base::Ref<IMessage> (*)(const proto::WireMessageHeader&)>;
static FactoryMap& GetFactoryMap();
};
@ -50,8 +57,8 @@ namespace ls {
struct MessageMixin : IMessage {
constexpr static auto TYPE_CODE = base::FourCC32<fourcc>();
explicit MessageMixin()
: IMessage() {
explicit MessageMixin(const proto::WireMessageHeader& header)
: IMessage(header) {
static_cast<void>(registered);
}
@ -61,20 +68,20 @@ namespace ls {
private:
static bool Register() {
MessageFactory::GetFactoryMap().insert({ TYPE_CODE, []() -> base::Ref<IMessage> {
return std::make_shared<Impl>();
MessageFactory::GetFactoryMap().insert({ TYPE_CODE, [](const proto::WireMessageHeader& header) -> base::Ref<IMessage> {
return std::make_shared<Impl>(header);
} });
return true;
return true;
}
static inline bool registered = Register();
};
// :( Makes the boilerplate shorter and sweeter though.
// :( Makes the boilerplate shorter and sweeter (and easier to change) though.
#define LS_MESSAGE(T, fourCC) struct T : public ls::MessageMixin<fourCC, T>
#define LS_MESSAGE_CTOR(T, fourCC) \
using Super = ls::MessageMixin<fourCC, T>; \
explicit T() \
: Super() { \
#define LS_MESSAGE_CTOR(T, fourCC) \
using Super = ls::MessageMixin<fourCC, T>; \
explicit T(const ls::proto::WireMessageHeader& header) \
: Super(header) { \
}
} // namespace ls

46
src/Server.cpp Normal file
View File

@ -0,0 +1,46 @@
#include "Server.hpp"
#include "DirtySockServer.hpp"
#include "impl/asio_config.hpp"
namespace ls {
Server::Server(asio::any_io_executor exec, const Config& cfg)
: exec(exec), stopCv(exec), config(cfg) {
}
Server::~Server() = default; // for now
base::Awaitable<void> Server::Start() {
// TODO: make mariadb connection first, if this fails blow up
lobbyServer = std::make_shared<DirtySockServer>(exec);
lobbyServer->Start(config.lobbyListenEndpoint);
if(!lobbyServer->Listening()) {
// uh oh worm..
logger.Error("for some reason lobby server isnt listening..");
co_return;
}
// TODO: http server? there's apparently some stuff we can have that uses it
// wait to stop
co_await stopCv.Wait([&]() { return stopping; });
// stop the ds and http servers
stopping = false;
stopCv.NotifyAll();
co_return;
}
base::Awaitable<void> Server::Stop() {
stopping = true;
stopCv.NotifyAll();
co_await stopCv.Wait([&]() { return !stopping; });
co_return;
}
} // namespace ls

42
src/Server.hpp Normal file
View File

@ -0,0 +1,42 @@
#pragma once
#include <base/assert.hpp>
#include <base/async_condition_variable.hpp>
#include <impl/asio_config.hpp>
#include <set>
#include "base/logger.hpp"
namespace ls {
struct DirtySockServer;
struct Server {
struct Config {
asio::ip::tcp::endpoint buddyListenEndpoint;
asio::ip::tcp::endpoint lobbyListenEndpoint;
};
Server(asio::any_io_executor exec, const Config& cfg);
~Server();
base::Awaitable<void> Start();
base::Awaitable<void> Stop();
private:
asio::any_io_executor exec;
base::AsyncConditionVariable stopCv;
bool stopping { false };
base::Ref<DirtySockServer> lobbyServer;
Config config;
base::Logger logger { base::MakeChannelId(base::MessageSource::Server, base::MessageComponentSource::Server_Server) };
};
} // namespace ls

View File

@ -1,3 +1,4 @@
#pragma once
#include <base/network_order.hpp>
namespace ls::proto {

View File

@ -1,13 +1,15 @@
#include <base/assert.hpp>
#include <base/stdout_sink.hpp>
#include <base/types.hpp>
#include <thread>
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/thread_pool.hpp>
#include <thread>
#include <toml++/toml.hpp>
std::optional<asio::thread_pool> ioc;
// ls server global here
#include "Server.hpp"
asio::io_context ioc(1);
base::Unique<ls::Server> server;
constexpr static std::string_view CONFIG_FILE = "lobbyserver.toml";
@ -21,48 +23,47 @@ base::Awaitable<void> CoWaitForSignal() {
base::LogInfo("SIGINT/SIGTERM recieved, stopping server...");
//co_await server->Stop();
base::LogInfo("Server stopped successfully");
// Deallocate the server
// server.reset();
// At this point, we can now stop the io_context, which will cause
// the main to return and ultimately exit the protgram
ioc->stop();
// After this the main coroutine will handle cleanly shutting down
co_await server->Stop();
co_return;
}
base::Awaitable<void> CoMain() {
//server = std::make_unique<...>(co_await asio::this_coro::executor, config);
//co_await server->Launch();
base::Awaitable<void> CoMain(const ls::Server::Config& config) {
server = std::make_unique<ls::Server>(co_await asio::this_coro::executor, config);
co_await server->Start();
co_return;
}
int main() {
base::LoggerAttachStdout();
auto config = ls::Server::Config {};
try {
auto table = toml::parse_file(CONFIG_FILE);
if(table["lobbyserver"].is_table()) {
auto addr_ptr = table["lobbyserver"]["listen_address"].as_string();
auto port_ptr = table["lobbyserver"]["listen_port"].as_integer();
auto lobby_port_ptr = table["lobbyserver"]["lobby_listen_port"].as_integer();
auto buddy_port_ptr = table["lobbyserver"]["buddy_listen_port"].as_integer();
if(!addr_ptr || !port_ptr) {
if(!addr_ptr || !lobby_port_ptr || !buddy_port_ptr) {
base::LogError("Invalid configuration file \"{}\".", CONFIG_FILE);
return 1;
}
if(port_ptr->get() > 65535) {
base::LogError("Invalid listen port \"{}\", should be 65535 or less", port_ptr->get());
if(lobby_port_ptr->get() > 65535) {
base::LogError("Invalid lobby listen port \"{}\", should be 65535 or less", lobby_port_ptr->get());
return 1;
}
//config.listenEndpoint = { asio::ip::make_address(addr_ptr->get()), static_cast<u16>(port_ptr->get()) };
if(buddy_port_ptr->get() > 65535) {
base::LogError("Invalid buddy listen port \"{}\", should be 65535 or less", buddy_port_ptr->get());
return 1;
}
config.buddyListenEndpoint = { asio::ip::make_address(addr_ptr->get()), static_cast<u16>(buddy_port_ptr->get()) };
config.lobbyListenEndpoint = { asio::ip::make_address(addr_ptr->get()), static_cast<u16>(lobby_port_ptr->get()) };
} else {
base::LogError("Invalid configuration file \"{}\"", CONFIG_FILE);
return 1;
@ -73,9 +74,7 @@ int main() {
return 1;
}
ioc.emplace((std::thread::hardware_concurrency() / 2) - 1);
asio::co_spawn(*ioc, CoWaitForSignal(), [&](auto ep) {
asio::co_spawn(ioc, CoWaitForSignal(), [&](auto ep) {
if(ep) {
try {
std::rethrow_exception(ep);
@ -85,7 +84,7 @@ int main() {
}
});
asio::co_spawn(*ioc, CoMain(), [&](auto ep) {
asio::co_spawn(ioc, CoMain(config), [&](auto ep) {
if(ep) {
try {
std::rethrow_exception(ep);
@ -93,12 +92,12 @@ int main() {
BASE_CHECK(false, "Unhandled exception in server main loop: {}", e.what());
}
} else {
base::LogInfo("Main coroutine returned, stopping server\n");
base::LogInfo("Server returned, exiting process\n");
// done
ioc->stop();
ioc.stop();
}
});
ioc->attach();
ioc.run();
return 0;
}

View File

@ -1,12 +1,12 @@
#include <impl/asio_config.hpp>
#include "base/logger.hpp"
#include "IMessage.hpp"
#include "../IMessage.hpp"
LS_MESSAGE(PingMessage, "~png") {
LS_MESSAGE_CTOR(PingMessage, "~png")
base::Awaitable<void> Process(base::Ref<ls::Client> client) override {
base::Awaitable<void> Process(base::Ref<ls::DirtySockClient> client) override {
base::LogInfo("Got ping message!");
co_return;
}