Compare commits

...

2 Commits

Author SHA1 Message Date
Lily Tsuru f2c01cf924 aries: Implement async I/O algorithm for writing Aries messages
This may also save some memory, because we now no longer need to copy the contents of the memory multiple times.
2024-03-20 23:32:15 -04:00
Lily Tsuru a3a43269fe *: Move most Aries protocol stuff to new "aries" library
Not everything is moved yet, this needs a bit more work.
2024-03-20 23:09:58 -04:00
19 changed files with 380 additions and 248 deletions

View File

@ -25,6 +25,8 @@ add_subdirectory(lib/base)
add_subdirectory(lib/impl) add_subdirectory(lib/impl)
add_subdirectory(lib/http) add_subdirectory(lib/http)
add_subdirectory(lib/aries)
# projects # projects
add_subdirectory(src) add_subdirectory(src)

11
lib/aries/CMakeLists.txt Normal file
View File

@ -0,0 +1,11 @@
add_library(ls_aries
Tags.cpp
)
lobbyserver_target(ls_aries)
target_link_libraries(ls_aries PUBLIC
base::base
)
add_library(ls::aries ALIAS ls_aries)

29
lib/aries/Message.hpp Normal file
View File

@ -0,0 +1,29 @@
#pragma once
#include <base/network_order.hpp>
#include <base/fourcc.hpp>
namespace ls::aries {
/// The Aries message header.
struct [[gnu::packed]] AriesMessageHeader {
/// Message FourCC.
base::FourCC32_t typeCode {};
/// Apparently a extra 4 bytes of FourCC?
base::FourCC32_t typeCodeHi {};
/// The size of the message payload. Is network order (big endian), and includes the size of this header
base::NetworkOrder<u32> messageSize {};
};
/// The raw components of an Aries message. Used by our I/O algoritms.
struct RawAriesMessage {
AriesMessageHeader header;
std::string tagFields;
};
// Sanity checking.
static_assert(sizeof(AriesMessageHeader) == 12, "Aries message header size is invalid");
} // namespace ls::proto

85
lib/aries/MessageIo.hpp Normal file
View File

@ -0,0 +1,85 @@
#include <aries/Message.hpp>
#include <aries/Tags.hpp>
#include <base/types.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/write.hpp>
#include <exception>
#include <impl/asio_config.hpp>
#include "boost/asio/buffer.hpp"
namespace ls::aries {
// Mostly to be conservative. I don't think the game will really care but bleh
constexpr static auto MAX_TAGFIELD_SIZE_IN_MB = 1;
constexpr static auto MAX_TAGFIELD_SIZE_IN_BYTES = MAX_TAGFIELD_SIZE_IN_MB * (1024 * 1024);
namespace errors {
struct TagPayloadTooLarge : std::exception {
TagPayloadTooLarge(u32 size)
: payloadSize(size) {
whatStr = std::format("Tag field data size is over {} MB (Max is {}MB).", (static_cast<u32>(payloadSize) / 1024 / 1024), MAX_TAGFIELD_SIZE_IN_MB);
}
const char* what() const noexcept override {
return whatStr.c_str();
}
private:
u32 payloadSize;
std::string whatStr;
};
} // namespace errors
/// Reads an Aries message from an Boost.Asio async read stream.
/// Returns the raw Aries message header and the tag field buffer.
template <class AsyncReadStream>
base::Awaitable<RawAriesMessage> AsyncReadAriesMessage(AsyncReadStream& stream) {
RawAriesMessage res;
// Read the header first
co_await asio::async_read(stream, asio::buffer(&res.header, sizeof(res.header)), asio::deferred);
auto realPayloadSize = res.header.messageSize - sizeof(res.header);
// Read tag payload (if there is one)
if(res.header.messageSize != sizeof(res.header)) {
// Sanity check. I don't expect game payloads to ever reach this large, but who knows.
if(realPayloadSize > MAX_TAGFIELD_SIZE_IN_BYTES)
throw errors::TagPayloadTooLarge(realPayloadSize);
res.tagFields.resize(realPayloadSize);
co_await asio::async_read(stream, asio::buffer(res.tagFields), asio::deferred);
}
co_return res;
}
template <class AsyncReadStream>
base::Awaitable<void> AsyncWriteAriesMessage(AsyncReadStream& stream, const RawAriesMessage& message) {
auto realTagFieldSize = message.header.messageSize - sizeof(message.header);
// Make sure *we* won't write a message the official Aries protocol
// won't like (even though it'd probably just crash, it's nice for us to do this.)
if(message.header.messageSize != sizeof(message.header)) {
// Sanity check. I don't expect game payloads to ever reach this large, but who knows.
if(realTagFieldSize > MAX_TAGFIELD_SIZE_IN_BYTES)
throw errors::TagPayloadTooLarge(realTagFieldSize);
}
// Our buffer list. We pass this to asio::async_write so we only actually have to perform
// one (scatter-gather) I/O operation in this function
std::array<asio::const_buffer, 2> buffers = {
asio::buffer(&message.header, sizeof(message.header)),
asio::buffer(message.tagFields, realTagFieldSize)
};
co_await asio::async_write(stream, buffers, asio::deferred);
co_return;
}
} // namespace ls::aries

3
lib/aries/README.md Normal file
View File

@ -0,0 +1,3 @@
# aries
Library for working with DirtySDK Aries (old-lobby) protocol messages.

101
lib/aries/Tags.cpp Normal file
View File

@ -0,0 +1,101 @@
#include <aries/Tags.hpp>
namespace ls::aries {
bool ParseTagFieldsToMap(const std::string_view tagFieldData, TagMap& outMap) {
// Nothing to parse,
// which isn't exclusively a failure condition.
if(tagFieldData.empty())
return true;
std::string key;
std::string val;
usize inputIndex = 0;
// TODO: Investigate rewriting this using ragel or something, so it's not something that has to be
// heavily maintained or unit tested to avoid bugs.
//
// Also: maybe we should strongly type or deserialize binary payloads. There are multiple variants of these..
enum class ReaderState : u32 {
InKey, ///< The state machine is currently parsing a key.
InValue ///< The state machine is currently parsing a value.
} state { ReaderState::InKey };
// Parse all properties, using a fairly simple state machine to do so.
//
// State transition mappings:
// = - from key to value state (if in key state)
// \n - from value to key state (if in value state, otherwise error)
while(inputIndex != tagFieldData.size()) {
switch(tagFieldData[inputIndex]) {
case '=':
if(state == ReaderState::InKey) {
state = ReaderState::InValue;
break;
} else {
// If we're in the value state, we're allowed to nest = signs, I think.
// if not, then this is PROBABLY an error state.
val += static_cast<char>(tagFieldData[inputIndex]);
}
break;
case '\n':
if(state == ReaderState::InValue) {
outMap[key] = val;
// Reset the state machine, to read another property.
key.clear();
val.clear();
state = ReaderState::InKey;
break;
} else {
// If we get here in the key state, this is DEFINITELY an error.
return false;
}
// Any other characters are not important to the state machine,
// and are instead written to the given staging string for the current
// state machine state.
default:
switch(state) {
case ReaderState::InKey:
key += tagFieldData[inputIndex];
break;
case ReaderState::InValue:
// Skip past/ignore quotation marks.
if(tagFieldData[inputIndex] == '\"' || tagFieldData[inputIndex] == '\'')
break;
val += tagFieldData[inputIndex];
break;
}
break;
}
inputIndex++;
}
// Parse succeeded
return true;
}
void SerializeTagFields(const TagMap& map, std::string& outStr) {
std::string tagFieldBuffer{};
// Reserve a sane amount, to avoid allocations when serializing
// (in most cases; larger tag count MIGHT still cause some allocation pressure).
tagFieldBuffer.reserve(512);
// Serialize the tag fields
for(auto [key, value] : map)
tagFieldBuffer += std::format("{}={}\n", key, value);
// Null terminate it. (TODO: We shouldn't have to do this anymore, std::string does this on its own)
//tagFieldBuffer.push_back('\0');
outStr = std::move(tagFieldBuffer);
}
} // namespace ls::aries

17
lib/aries/Tags.hpp Normal file
View File

@ -0,0 +1,17 @@
#include <base/types.hpp>
#include <unordered_map>
namespace ls::aries {
using TagMap = std::unordered_map<std::string, std::string>;
/// Parses tag field data to a TagMap.
/// # Returns
/// True on success; false otherwise (TODO: Move to exceptions or error_category)
bool ParseTagFieldsToMap(const std::string_view tagFieldData, TagMap& outMap);
/// Serializes a TagMap to a string.
void SerializeTagFields(const TagMap& map, std::string& outStr);
}

View File

@ -27,6 +27,11 @@ namespace base {
} }
} }
inline std::string FourCC32ToString(FourCC32_t fcc) {
auto* fccAsBytes = std::bit_cast<u8*>(&fcc);
return std::format("{:c}{:c}{:c}{:c}", fccAsBytes[0], fccAsBytes[1], fccAsBytes[2], fccAsBytes[3]);
}
// TODO: 64-bit version which returns a u64 (if required?) // TODO: 64-bit version which returns a u64 (if required?)
} // namespace base } // namespace base

View File

@ -19,4 +19,5 @@ target_link_libraries(lobbyserver PRIVATE
base::base base::base
base::http base::http
Boost::json Boost::json
ls::aries
) )

View File

@ -1,17 +1,19 @@
#include "DirtySockClient.hpp" #include "DirtySockClient.hpp"
#include <boost/asio/read.hpp> #include <aries/MessageIo.hpp>
#include <boost/asio/write.hpp>
#include <impl/asio_config.hpp>
#include "DirtySockServer.hpp" #include "DirtySockServer.hpp"
#include "aries/Message.hpp"
constexpr static auto MAX_PAYLOAD_SIZE_IN_MB = 4;
constexpr static auto MAX_PAYLOAD_SIZE_IN_BYTES = MAX_PAYLOAD_SIZE_IN_MB * (1024 * 1024);
// All our Asio/network related ops set this expiry time before they call Asio ops // All our Asio/network related ops set this expiry time before they call Asio ops
// so that Beast's stream timer stuff can work its magic and automatically timeout. // so that Beast's stream timer stuff can work its magic and automatically timeout.
constexpr static auto EXPIRY_TIME = std::chrono::seconds(10); //
// The read timeout is selected as high as it is mostly because Dirtysock's LobbyAPI seems to prefer pings
// (That is, ICMP Echo Requests) to determine if the server is responding, and does not seem to send
// periodic data as a result.. A bit of a iffy decision, but it was one made over 20 years ago and it worked
// for at least 5, so I can't really complain that much.
constexpr static auto READ_EXPIRY_TIME = std::chrono::seconds(960);
constexpr static auto WRITE_EXPIRY_TIME = std::chrono::seconds(10);
namespace ls { namespace ls {
@ -48,39 +50,28 @@ namespace ls {
} }
base::Awaitable<DirtySockClient::MessagePtr> DirtySockClient::Network_ReadMessage() { base::Awaitable<DirtySockClient::MessagePtr> DirtySockClient::Network_ReadMessage() {
proto::WireMessageHeader header;
std::vector<u8> propertyBuffer;
try { try {
// Read the header first // Set the read expiry time. We only do this once every read because
stream.expires_after(EXPIRY_TIME); // we want the timer to account throughout the whole AsyncReadAriesMessage
co_await asio::async_read(stream, asio::buffer(&header, sizeof(header)), asio::deferred); // operation, which issues multiple I/O operations to complete its work.
stream.expires_after(READ_EXPIRY_TIME);
auto realPayloadSize = header.payloadSize - sizeof(header); auto res = co_await aries::AsyncReadAriesMessage(stream);
// Sanity check. I don't expect game payloads to ever reach this large, but who knows.
if(realPayloadSize > MAX_PAYLOAD_SIZE_IN_BYTES) {
logger->error("{}: WOAH! Client sent a message with a payload size of {} MB (Max is {}MB).", GetAddress().to_string(), (static_cast<u32>(header.payloadSize) / 1024 / 1024), MAX_PAYLOAD_SIZE_IN_MB);
co_return nullptr;
}
// If the message type isn't in the server's allowed message list, give up. // If the message type isn't in the server's allowed message list, give up.
// (we probably should throw instead...) // (we probably should throw instead...)
if(!server->allowedMessages.empty()) { if(!server->allowedMessages.empty()) {
if(!server->allowedMessages.contains(static_cast<base::FourCC32_t>(header.typeCode))) if(!server->allowedMessages.contains(res.header.typeCode))
co_return nullptr; co_return nullptr;
} }
propertyBuffer.resize(realPayloadSize);
stream.expires_after(EXPIRY_TIME);
co_await asio::async_read(stream, asio::buffer(propertyBuffer), asio::deferred);
logger->info("read properties");
// this function may fail and also return nullptr. Maybe we should instead throw an exception there // this function may fail and also return nullptr. Maybe we should instead throw an exception there
// (that we leave to callers to catch) // (that we leave to callers to catch)
co_return MessageFactory::CreateAndParseMessage(header, propertyBuffer); co_return AriesMessageFactory::CreateAndParseMessage(res.header, res.tagFields);
} catch(aries::errors::TagPayloadTooLarge& large) {
logger->error("{}: {}", GetAddress().to_string(), large.what());
co_return nullptr;
} catch(bsys::system_error& ec) { } catch(bsys::system_error& ec) {
// Instead of bubbling up errors we DO care about, rethrow them to the higher level // Instead of bubbling up errors we DO care about, rethrow them to the higher level
// calling us. // calling us.
@ -95,13 +86,13 @@ namespace ls {
} }
base::Awaitable<void> DirtySockClient::Network_WriteMessage(ConstMessagePtr message) { base::Awaitable<void> DirtySockClient::Network_WriteMessage(ConstMessagePtr message) {
auto buf = std::vector<u8> {}; aries::RawAriesMessage serializedMessage;
message->SerializeTo(buf); message->SerializeTo(serializedMessage);
try { try {
stream.expires_after(std::chrono::seconds(EXPIRY_TIME)); stream.expires_after(std::chrono::seconds(WRITE_EXPIRY_TIME));
co_await asio::async_write(stream, asio::buffer(buf), asio::deferred); co_await aries::AsyncWriteAriesMessage(stream, serializedMessage);
} catch(bsys::system_error& ec) { } catch(bsys::system_error& ec) {
if(ec.code() != asio::error::operation_aborted || ec.code() != beast::error::timeout) if(ec.code() != asio::error::operation_aborted || ec.code() != beast::error::timeout)
logger->error("{}: Error in DirtySockClient::Network_WriteMessage(): {}", GetAddress().to_string(), ec.what()); logger->error("{}: Error in DirtySockClient::Network_WriteMessage(): {}", GetAddress().to_string(), ec.what());
@ -164,7 +155,7 @@ namespace ls {
} }
} catch(bsys::system_error& ec) { } catch(bsys::system_error& ec) {
if(ec.code() == asio::error::eof) { if(ec.code() == asio::error::eof) {
logger->info("{}: Connection closed", GetAddress().to_string()); logger->info("Server \"{}\": {} closed connection", GetServer()->name, GetAddress().to_string());
} else if(ec.code() != asio::error::operation_aborted) } else if(ec.code() != asio::error::operation_aborted)
logger->error("{}: Error in DirtySockClient::Coro_ReaderEnd(): {}", GetAddress().to_string(), ec.what()); logger->error("{}: Error in DirtySockClient::Coro_ReaderEnd(): {}", GetAddress().to_string(), ec.what());
} }
@ -174,7 +165,7 @@ namespace ls {
} }
base::Awaitable<void> DirtySockClient::Run() { base::Awaitable<void> DirtySockClient::Run() {
logger->info("{}: Got connection", GetAddress().to_string()); logger->info("Server \"{}\": Got connection from {}", GetServer()->name, GetAddress().to_string());
asio::co_spawn( asio::co_spawn(
stream.get_executor(), [self = shared_from_this()] { stream.get_executor(), [self = shared_from_this()] {
return self->Coro_WriterEnd(); return self->Coro_WriterEnd();

View File

@ -12,8 +12,8 @@ namespace ls {
struct DirtySockServer; struct DirtySockServer;
struct DirtySockClient : public std::enable_shared_from_this<DirtySockClient> { struct DirtySockClient : public std::enable_shared_from_this<DirtySockClient> {
using MessagePtr = base::Ref<IMessage>; using MessagePtr = base::Ref<IAriesMessage>;
using ConstMessagePtr = base::Ref<const IMessage>; using ConstMessagePtr = base::Ref<const IAriesMessage>;
using Protocol = asio::ip::tcp; using Protocol = asio::ip::tcp;
using Stream = base::BeastStream<Protocol>; using Stream = base::BeastStream<Protocol>;

View File

@ -6,8 +6,8 @@
namespace ls { namespace ls {
DirtySockServer::DirtySockServer(asio::any_io_executor exec) DirtySockServer::DirtySockServer(asio::any_io_executor exec, std::string_view name)
: exec(exec), acceptor(exec) { : exec(exec), acceptor(exec), name(name.data(), name.length()) {
} }
void DirtySockServer::Start(const Protocol::endpoint& ep) { void DirtySockServer::Start(const Protocol::endpoint& ep) {
@ -34,7 +34,7 @@ namespace ls {
acceptor.bind(endpoint); acceptor.bind(endpoint);
acceptor.listen(asio::socket_base::max_listen_connections); acceptor.listen(asio::socket_base::max_listen_connections);
logger->info("DirtySockServer listening on {}:{}", endpoint.address().to_string(), endpoint.port()); logger->info("DirtySockServer \"{}\" listening on {}:{}", name, endpoint.address().to_string(), endpoint.port());
while(true) { while(true) {
auto socket = co_await acceptor.async_accept(asio::deferred); auto socket = co_await acceptor.async_accept(asio::deferred);

View File

@ -16,7 +16,7 @@ namespace ls {
/// alias for thing /// alias for thing
using AllowedMessagesSet = std::set<base::FourCC32_t>; using AllowedMessagesSet = std::set<base::FourCC32_t>;
DirtySockServer(asio::any_io_executor exec); DirtySockServer(asio::any_io_executor exec, std::string_view name);
void Start(const Protocol::endpoint& endpoint); void Start(const Protocol::endpoint& endpoint);
@ -39,6 +39,7 @@ namespace ls {
std::set<base::Ref<DirtySockClient>> clientSet; std::set<base::Ref<DirtySockClient>> clientSet;
std::string name;
base::Ref<spdlog::logger> logger = spdlog::get("ls_dsock_server"); base::Ref<spdlog::logger> logger = spdlog::get("ls_dsock_server");
}; };

View File

@ -6,166 +6,66 @@
// So debug message can just reply // So debug message can just reply
#include "DirtySockClient.hpp" #include "DirtySockClient.hpp"
#include "WireMessage.hpp"
namespace ls { namespace ls {
IMessage::IMessage(const proto::WireMessageHeader& header) IAriesMessage::IAriesMessage(const aries::AriesMessageHeader& header)
: header(header) { : header(header) {
} }
bool IMessage::ParseFromInputBuffer(std::span<const u8> inputBuffer) { bool IAriesMessage::ParseFromInputBuffer(const std::string_view inputBuffer) {
// Nothing to parse, return aries::ParseTagFieldsToMap(inputBuffer, tagFields);
// which isn't exclusively a failure condition.
if(inputBuffer.empty())
return true;
std::string key;
std::string val;
usize inputIndex = 0;
// TODO: Investigate rewriting this using ragel or something, so it's not something that has to be
// heavily maintained or unit tested to avoid bugs.
enum class ReaderState : u32 {
InKey, ///< The state machine is currently parsing a key.
InValue ///< The state machine is currently parsing a value.
} state { ReaderState::InKey };
// Parse all properties, using a fairly simple state machine to do so.
//
// State transition mappings:
// = - from key to value state (if in key state)
// \n - from value to key state (if in value state, otherwise error)
while(inputIndex != inputBuffer.size()) {
switch(inputBuffer[inputIndex]) {
case '=':
if(state == ReaderState::InKey) {
state = ReaderState::InValue;
break;
} else {
// If we're in the value state, we're allowed to nest = signs, I think.
// if not, then this is PROBABLY an error state.
val += static_cast<char>(inputBuffer[inputIndex]);
}
break;
case '\n':
if(state == ReaderState::InValue) {
properties[key] = val;
// Reset the state machine, to read another property.
key.clear();
val.clear();
state = ReaderState::InKey;
break;
} else {
// If we get here in the key state, this is DEFINITELY an error.
return false;
} }
// Any other characters are not important to the state machine, void IAriesMessage::SerializeTo(aries::RawAriesMessage& dataBuffer) const {
// and are instead written to the given staging string for the current aries::SerializeTagFields(tagFields, dataBuffer.tagFields);
// state machine state.
default:
switch(state) {
case ReaderState::InKey:
key += static_cast<char>(inputBuffer[inputIndex]);
break;
case ReaderState::InValue:
// Skip past quotation marks.
if(static_cast<char>(inputBuffer[inputIndex]) == '\"' || static_cast<char>(inputBuffer[inputIndex]) == '\'')
break;
val += static_cast<char>(inputBuffer[inputIndex]);
break;
}
break;
}
inputIndex++;
}
// Parse succeeded
return true;
}
void IMessage::SerializeTo(std::vector<u8>& dataBuffer) const {
std::string serializedProperties;
// Reserve a sane amount, to avoid allocations when serializing properties
// (in most cases; larger messages MIGHT still cause some allocation pressure.)
serializedProperties.reserve(512);
// Serialize properties
{
auto i = properties.size();
for(auto [key, value] : properties)
if(--i != 0)
serializedProperties += std::format("{}={}\n", key, value);
else
serializedProperties += std::format("{}={}", key, value);
}
// Null terminate the property data.
serializedProperties.push_back('\0');
// Create an appropriate header for the data. // Create an appropriate header for the data.
proto::WireMessageHeader header { dataBuffer.header = {
.typeCode = static_cast<u32>(TypeCode()), .typeCode = header.typeCode,
.typeCodeHi = 0, .typeCodeHi = header.typeCodeHi,
.payloadSize = sizeof(proto::WireMessageHeader) + serializedProperties.length() - 1 .messageSize = sizeof(aries::AriesMessageHeader) + dataBuffer.tagFields.length()
}; };
auto fullLength = sizeof(proto::WireMessageHeader) + serializedProperties.length();
// Resize the output buffer to the right size
dataBuffer.resize(fullLength);
// Write to the output buffer now.
memcpy(&dataBuffer[0], &header, sizeof(proto::WireMessageHeader));
memcpy(&dataBuffer[sizeof(proto::WireMessageHeader)], serializedProperties.data(), serializedProperties.length());
} }
const std::optional<std::string_view> IMessage::MaybeGetKey(const std::string& key) const { const std::optional<std::string_view> IAriesMessage::MaybeGetKey(const std::string& key) const {
if(properties.find(key) == properties.end()) if(tagFields.find(key) == tagFields.end())
return std::nullopt; return std::nullopt;
else else
return properties.at(key); return tagFields.at(key);
} }
void IMessage::SetOrAddProperty(const std::string& key, const std::string& value) { void IAriesMessage::SetOrAddProperty(const std::string& key, const std::string& value) {
properties[key] = value; tagFields[key] = value;
} }
// message factory // message factory
/// Debug message, used to.. well, debug, obviously. /// Debug message, used to.. well, debug, obviously.
struct DebugMessage : IMessage { struct DebugMessage : IAriesMessage {
explicit DebugMessage(const proto::WireMessageHeader& header) explicit DebugMessage(const aries::AriesMessageHeader& header)
: IMessage(header) { : IAriesMessage(header) {
} }
base::Awaitable<void> Process(base::Ref<ls::DirtySockClient> client) override { base::Awaitable<void> Process(base::Ref<ls::DirtySockClient> client) override {
auto* fccbytes = std::bit_cast<u8*>(&header.typeCode); spdlog::info("Unhandled Aries message \"{}\" \"{}\"", FourCC32ToString(header.typeCode), base::FourCC32ToString(header.typeCodeHi));
if(!tagFields.empty()) {
spdlog::info("With tags:");
spdlog::info("Debug Message: FourCC lo: \"{:c}{:c}{:c}{:c}\"", fccbytes[0], fccbytes[1], fccbytes[2], fccbytes[3]); for(auto [key, value] : tagFields)
spdlog::info("Debug Message Properties:"); spdlog::info("{}={}", key, value);
}
for(auto [key, value] : properties) // This snippet is a fair bit :( however it works to just replay the message.
spdlog::info("{}: {}", key, value); // And it's like 1kb at most it's not going to hurt much for code that will be snipped off
// at some point anyways lol
// a bit :( however it works to just replay the message.
client->Send(std::make_shared<DebugMessage>(*this)); client->Send(std::make_shared<DebugMessage>(*this));
co_return; co_return;
} }
}; };
struct MessageWithFourCC : IMessage { struct AriesSendOnlyMessage : IAriesMessage {
explicit MessageWithFourCC(const proto::WireMessageHeader& header) explicit AriesSendOnlyMessage(const aries::AriesMessageHeader& header)
: IMessage(header) { : IAriesMessage(header) {
} }
base::Awaitable<void> Process(base::Ref<ls::DirtySockClient> client) override { base::Awaitable<void> Process(base::Ref<ls::DirtySockClient> client) override {
@ -174,34 +74,35 @@ namespace ls {
} }
}; };
MessageFactory::FactoryMap& MessageFactory::GetFactoryMap() { AriesMessageFactory::FactoryMap& AriesMessageFactory::GetFactoryMap() {
static MessageFactory::FactoryMap factoryMap; static AriesMessageFactory::FactoryMap factoryMap;
return factoryMap; return factoryMap;
} }
base::Ref<IMessage> MessageFactory::CreateAndParseMessage(const proto::WireMessageHeader& header, std::span<const u8> propertyDataBuffer) { base::Ref<IAriesMessage> AriesMessageFactory::CreateAndParseMessage(const aries::AriesMessageHeader& header, const std::string_view tagFieldData) {
const auto& factories = GetFactoryMap(); const auto& factories = GetFactoryMap();
base::Ref<IMessage> ret = nullptr; base::Ref<IAriesMessage> ret = nullptr;
if(const auto it = factories.find(static_cast<base::FourCC32_t>(header.typeCode)); it != factories.end()) if(const auto it = factories.find(header.typeCode); it != factories.end())
ret = (it->second)(header); ret = (it->second)(header);
else else
ret = std::make_shared<DebugMessage>(header); ret = std::make_shared<DebugMessage>(header);
if(!ret->ParseFromInputBuffer(propertyDataBuffer)) if(!ret->ParseFromInputBuffer(tagFieldData))
return nullptr; return nullptr;
return ret; return ret;
} }
base::Ref<IMessage> MessageFactory::CreateMessageWithFourCC(base::FourCC32_t fourCC) { base::Ref<IAriesMessage> AriesMessageFactory::CreateSendMessage(base::FourCC32_t fourCC, base::FourCC32_t fourccHi) {
auto fakeHeader = proto::WireMessageHeader { // A fake header so that we can just use the constructor of IAriesMessage
static_cast<u32>(fourCC), auto fakeHeader = aries::AriesMessageHeader {
0, fourCC,
0 fourccHi,
sizeof(aries::AriesMessageHeader)
}; };
return std::make_shared<MessageWithFourCC>(fakeHeader); return std::make_shared<AriesSendOnlyMessage>(fakeHeader);
} }
} // namespace ls } // namespace ls

View File

@ -3,27 +3,27 @@
#include <base/types.hpp> #include <base/types.hpp>
#include <impl/asio_config.hpp> #include <impl/asio_config.hpp>
#include "WireMessage.hpp" #include <aries/Message.hpp>
#include <aries/Tags.hpp>
namespace ls { namespace ls {
struct Server;
struct DirtySockClient; struct DirtySockClient;
struct IMessage { /// A higher level repressentation of an Aries message. Contains a method to process the data.
explicit IMessage(const proto::WireMessageHeader& header); struct IAriesMessage {
explicit IAriesMessage(const aries::AriesMessageHeader& header);
virtual ~IMessage() = default; virtual ~IAriesMessage() = default;
/// Parses from input buffer. The data must live until /// Parses from input buffer. The data must live until
/// this function returns. /// this function returns.
/// This function may return false (or later, a more well defined /// This function may return false (or later, a more well defined
/// error code enumeration..) if the parsing fails. /// error code enumeration..) if the parsing fails.
bool ParseFromInputBuffer(std::span<const u8> data); bool ParseFromInputBuffer(const std::string_view data);
/// Serializes to a output data buffer. /// Serializes this Aries message to a user-provided [aries::RawAriesMessage] suitable for
void SerializeTo(std::vector<u8>& dataBuffer) const; /// use with the [aries::AsyncWriteAriesMessage] function.
void SerializeTo(aries::RawAriesMessage& message) const;
base::FourCC32_t TypeCode() const { return static_cast<base::FourCC32_t>(header.typeCode); }
/// Process a single message. /// Process a single message.
virtual base::Awaitable<void> Process(base::Ref<DirtySockClient> client) = 0; virtual base::Awaitable<void> Process(base::Ref<DirtySockClient> client) = 0;
@ -32,42 +32,42 @@ namespace ls {
void SetOrAddProperty(const std::string& key, const std::string& value); void SetOrAddProperty(const std::string& key, const std::string& value);
const proto::WireMessageHeader& GetHeader() const { return header; } const aries::AriesMessageHeader& GetHeader() const { return header; }
protected: protected:
proto::WireMessageHeader header; aries::AriesMessageHeader header;
/// all properties. /// all properties.
std::unordered_map<std::string, std::string> properties {}; aries::TagMap tagFields {};
}; };
struct MessageFactory { struct AriesMessageFactory {
/// Creates and parses the given implementation of IMessage. /// Creates and parses the given implementation of IMessage.
static base::Ref<IMessage> CreateAndParseMessage(const proto::WireMessageHeader& header, std::span<const u8> propertyDataBuffer); static base::Ref<IAriesMessage> CreateAndParseMessage(const aries::AriesMessageHeader& header, const std::string_view propertyDataBuffer);
/// Creates a message intended for sending to a client. /// Creates a message intended for sending to a client.
static base::Ref<IMessage> CreateMessageWithFourCC(base::FourCC32_t fourCC); static base::Ref<IAriesMessage> CreateSendMessage(base::FourCC32_t fourCC, base::FourCC32_t fourccHi = {});
private: private:
template <base::FixedString fourcc, class Impl> template <base::FixedString fourcc, class Impl>
friend struct MessageMixin; friend struct AriesMessageMixIn;
using FactoryMap = std::unordered_map<base::FourCC32_t, base::Ref<IMessage> (*)(const proto::WireMessageHeader&)>; using FactoryMap = std::unordered_map<base::FourCC32_t, base::Ref<IAriesMessage> (*)(const aries::AriesMessageHeader&)>;
static FactoryMap& GetFactoryMap(); static FactoryMap& GetFactoryMap();
}; };
template <base::FixedString fourcc, class Impl> template <base::FixedString fourcc, class Impl>
struct MessageMixin : IMessage { struct AriesMessageMixIn : IAriesMessage {
constexpr static auto TYPE_CODE = base::FourCC32<fourcc>(); constexpr static auto TYPE_CODE = base::FourCC32<fourcc>();
explicit MessageMixin(const proto::WireMessageHeader& header) explicit AriesMessageMixIn(const aries::AriesMessageHeader& header)
: IMessage(header) { : IAriesMessage(header) {
static_cast<void>(registered); static_cast<void>(registered);
} }
private: private:
static bool Register() { static bool Register() {
MessageFactory::GetFactoryMap().insert({ TYPE_CODE, [](const proto::WireMessageHeader& header) -> base::Ref<IMessage> { AriesMessageFactory::GetFactoryMap().insert({ TYPE_CODE, [](const aries::AriesMessageHeader& header) -> base::Ref<IAriesMessage> {
return std::make_shared<Impl>(header); return std::make_shared<Impl>(header);
} }); } });
return true; return true;
@ -76,10 +76,10 @@ namespace ls {
}; };
// :( Makes the boilerplate shorter and sweeter (and easier to change) though. // :( Makes the boilerplate shorter and sweeter (and easier to change) though.
#define LS_MESSAGE(T, fourCC) struct T : public ls::MessageMixin<fourCC, T> #define LS_MESSAGE(T, fourCC) struct T : public ls::AriesMessageMixIn<fourCC, T>
#define LS_MESSAGE_CTOR(T, fourCC) \ #define LS_MESSAGE_CTOR(T, fourCC) \
explicit T(const ls::proto::WireMessageHeader& header) \ explicit T(const ls::aries::AriesMessageHeader& header) \
: ls::MessageMixin<fourCC, T>(header) { \ : ls::AriesMessageMixIn<fourCC, T>(header) { \
} }
} // namespace ls } // namespace ls

View File

@ -16,19 +16,25 @@ namespace ls {
base::Awaitable<void> Server::Start() { base::Awaitable<void> Server::Start() {
// TODO: make mariadb connection first, if this fails blow up // TODO: make mariadb connection first, if this fails blow up
lobbyServer = std::make_shared<DirtySockServer>(exec); lobbyRdirServer = std::make_shared<DirtySockServer>(exec, "Redirector server");
lobbyRdirServer->SetAllowedMessages({ base::FourCC32<"@dir">() });
lobbyServer->Start(config.lobbyListenEndpoint); lobbyRdirServer->Start(config.lobbyListenEndpoint);
if(!lobbyServer->Listening()) { if(!lobbyRdirServer->Listening()) {
// uh oh worm.. // uh oh worm..
logger->error("for some reason lobby server isnt listening.."); logger->error("for some reason the redirector server isnt listening..");
co_return; co_return;
} }
buddyServer = std::make_shared<DirtySockServer>(exec); buddyServer = std::make_shared<DirtySockServer>(exec, "Lobby server");
buddyServer->Start(config.buddyListenEndpoint); buddyServer->Start(config.buddyListenEndpoint);
if(!lobbyRdirServer->Listening()) {
// uh oh worm..
logger->error("for some reason the lobby server isnt listening..");
co_return;
}
// TODO: http server? there's apparently some stuff we can have that uses it // TODO: http server? there's apparently some stuff we can have that uses it

View File

@ -30,7 +30,7 @@ namespace ls {
base::AsyncConditionVariable stopCv; base::AsyncConditionVariable stopCv;
bool stopping { false }; bool stopping { false };
base::Ref<DirtySockServer> lobbyServer; base::Ref<DirtySockServer> lobbyRdirServer;
base::Ref<DirtySockServer> buddyServer; base::Ref<DirtySockServer> buddyServer;
Config config; Config config;

View File

@ -1,21 +0,0 @@
#pragma once
#include <base/network_order.hpp>
namespace ls::proto {
/// The on-wire message header.
struct [[gnu::packed]] WireMessageHeader {
/// Message FourCC.
u32 typeCode {};
/// Apparently a extra 4 bytes of FourCC?
u32 typeCodeHi {};
/// The size of the data payload.
base::NetworkOrder<u32> payloadSize {};
};
// Sanity checking.
static_assert(sizeof(WireMessageHeader) == 12, "Wire message header size is invalid");
} // namespace ls::proto

View File

@ -6,21 +6,21 @@
// clang-format off // clang-format off
LS_MESSAGE(AtDirMessage, "@dir") { LS_MESSAGE(AriesRedirMessage, "@dir") {
LS_MESSAGE_CTOR(AtDirMessage, "@dir") LS_MESSAGE_CTOR(AriesRedirMessage, "@dir")
base::Awaitable<void> Process(base::Ref<ls::DirtySockClient> client) override { base::Awaitable<void> Process(base::Ref<ls::DirtySockClient> client) override {
spdlog::info("Got redir message!"); spdlog::info("Got redir message!");
spdlog::info("@dir Properties:"); spdlog::info("@dir tags:");
for(auto [key, value] : properties) for(auto [key, value] : tagFields)
spdlog::info("{}: {}", key, value); spdlog::info("{}: {}", key, value);
// create our @dir message we send BACK to the client. // create our @dir message we send BACK to the client.
auto rdirOut = ls::MessageFactory::CreateMessageWithFourCC(base::FourCC32<"@dir">()); auto rdirOut = ls::AriesMessageFactory::CreateSendMessage(base::FourCC32<"@dir">());
// TODO: Use the server class to get at this.. // TODO: Please use the server class to get at this..
rdirOut->SetOrAddProperty("ADDR", "192.168.1.149"); rdirOut->SetOrAddProperty("ADDR", "192.168.1.149");
rdirOut->SetOrAddProperty("PORT", "10998"); rdirOut->SetOrAddProperty("PORT", "10998");
// sample // sample