SSX3LobbyServer/lib/http/websocket_client.cpp

194 lines
6.1 KiB
C++
Raw Normal View History

#include <cstdint>
#include <http/utils.hpp>
#include <http/websocket_client.hpp>
#include <http/proxy_address.hpp>
#include <base/assert.hpp>
namespace base::http {
void WebSocketClient::Send(ConstMessagePtr message) {
BASE_ASSERT(message.get() != nullptr, "This function needs a valid message pointer.");
if(closed)
return;
// If the backpressure is too large for this client
// then just disconnect it. It's not worth trying to catch up,
// especially if this client is actually holding references to messages
// and causing what effectively amounts to a memory leak!
if(sendQueue.size() > MAX_MESSAGES_IN_QUEUE) {
if(!closed && !forceClosed) {
forceClosed = true;
sendQueue.clear();
asio::co_spawn(stream.get_executor(), Close(), asio::detached);
}
return;
}
sendQueue.push_back(message);
sendCondition.NotifyOne();
}
Awaitable<bool> WebSocketClient::Handshake() {
BASE_ASSERT(upgrade.IsWebsocketUpgrade(), "Arrived here without a valid WebSocket upgrade?");
// Disable the expiry timeout that the HTTP server placed on the original stream, since
// websocket::stream has its own functionality to time itself out.
stream.next_layer().expires_never();
stream.set_option(beast::websocket::stream_base::timeout::suggested(beast::role_type::server));
// decorate the handshake with our common server field(s)
stream.set_option(beast::websocket::stream_base::decorator([](beast::websocket::response_type& res) { SetCommonResponseFields(res); }));
// try accepting
try {
co_await stream.async_accept(upgrade.Native(), asio::deferred);
// Spawn coroutines for read/write end now that we actually performed the websokcet handshake
auto exec = stream.next_layer().get_executor();
asio::co_spawn(exec, WriteEnd(), asio::detached);
asio::co_spawn(exec, ReadEnd(), asio::detached);
co_return true;
} catch(bsys::system_error& ec) {
// just close the socket then.
bsys::error_code ignore;
stream.next_layer().socket().close(ignore);
co_return false;
}
co_return false;
}
Awaitable<void> WebSocketClient::Close() {
return CloseWithReason("Generic close.");
}
Awaitable<void> WebSocketClient::CloseWithReason(const std::string& reason) {
auto self = shared_from_this();
// don't try to close more than once
// (this avoids calling the close handler
// more than once, or trying to close the WebSocket stream
// multiple times, avoiding crashes.)
if(self->closed)
co_return;
self->closed = true;
if(self->listener)
co_await self->listener->OnClose();
try {
if(self->stream.next_layer().socket().is_open()) {
co_await self->stream.async_close(beast::websocket::close_reason { beast::websocket::close_code::try_again_later, reason },
asio::deferred);
}
} catch(bsys::error_code& ec) {
logger.Error("Error when closing (not fatal): {}", ec.what());
}
// notify the send task that it needs to stop
self->sendQueue.clear();
self->sendCondition.NotifyAll();
co_return;
}
asio::ip::address WebSocketClient::Address() {
// N.B. The previous "get it from the socket" method would "work"
// but we don't support any sort of multipath, so the IP we get
// the upgrade request on (all things considered) will be the IP we read
#ifdef BASE_HTTP_REVERSE_PROXY_SUPPORT
return http::GetProxyAddress(upgrade.src, upgrade.Native());
#else
return upgrade.src;
#endif
}
Awaitable<void> WebSocketClient::WriteEnd() {
auto self = shared_from_this();
try {
while(true) {
// If the send queue was empty, then there isn't really anything we can do
if(self->sendQueue.empty()) {
self->sending = false;
self->sendCondition.NotifyOne();
co_await self->sendCondition.Wait([self]() {
if(self->closed)
return true;
return !self->sendQueue.empty();
});
}
if(closed)
break;
self->sending = true;
auto& message = self->sendQueue.front();
if(message->GetType() == WebSocketMessage::Type::Text) {
self->stream.text(true);
co_await self->stream.async_write(asio::buffer(message->AsText()), asio::deferred);
} else if(message->GetType() == WebSocketMessage::Type::Binary) {
self->stream.binary(true);
co_await self->stream.async_write(asio::buffer(message->AsBinary()), asio::deferred);
}
// pop the element off the queue
self->sendQueue.erase(sendQueue.begin());
self->sendQueue.shrink_to_fit();
}
} catch(bsys::system_error& ec) {
// logger.Error("failure in write end: {}", ec.what());
}
co_await self->Close();
co_return;
}
Awaitable<void> WebSocketClient::ReadEnd() {
auto self = shared_from_this();
beast::flat_buffer messageBuffer;
try {
while(true) {
// wait for the send end to be done sending messages, if it decides to wake up
co_await self->sendCondition.Wait([self]() {
if(self->closed)
return true;
return !self->sending;
});
// If the connection was closed break out
if(self->closed)
break;
// let's try reading a message
auto b = co_await self->stream.async_read(messageBuffer, asio::deferred);
if(self->listener) {
if(self->stream.got_text()) {
co_await self->listener->OnMessage(std::make_shared<WebSocketMessage>(
std::string { reinterpret_cast<char*>(messageBuffer.data().data()), messageBuffer.size() }));
} else if(self->stream.got_binary()) {
co_await self->listener->OnMessage(std::make_shared<WebSocketMessage>(
std::span<u8> { reinterpret_cast<u8*>(messageBuffer.data().data()), messageBuffer.size() }));
}
}
messageBuffer.consume(b);
// notify the send end we're done reading, so it can (if the queue isn't empty) write
// this might end up blocking us from reading for a while if it decides it can.
self->sendCondition.NotifyOne();
}
} catch(bsys::system_error& ec) {
// TODO: this should be re-enabled but doesn't seem to limit properly
// if(ec.code() != beast::websocket::error::closed || ec.code() != asio::error::eof)
// logger.Error("fail in read end {}", ec.what());
}
co_await self->Close();
co_return;
}
} // namespace base::http