195 lines
6.1 KiB
C++
195 lines
6.1 KiB
C++
#include <cstdint>
|
|
#include <http/utils.hpp>
|
|
#include <http/websocket_client.hpp>
|
|
#include <http/proxy_address.hpp>
|
|
#include <base/assert.hpp>
|
|
#include <spdlog/spdlog.h>
|
|
|
|
namespace base::http {
|
|
|
|
void WebSocketClient::Send(ConstMessagePtr message) {
|
|
BASE_ASSERT(message.get() != nullptr, "This function needs a valid message pointer.");
|
|
if(closed)
|
|
return;
|
|
|
|
// If the backpressure is too large for this client
|
|
// then just disconnect it. It's not worth trying to catch up,
|
|
// especially if this client is actually holding references to messages
|
|
// and causing what effectively amounts to a memory leak!
|
|
if(sendQueue.size() > MAX_MESSAGES_IN_QUEUE) {
|
|
if(!closed && !forceClosed) {
|
|
forceClosed = true;
|
|
sendQueue.clear();
|
|
asio::co_spawn(stream.get_executor(), Close(), asio::detached);
|
|
}
|
|
return;
|
|
}
|
|
|
|
sendQueue.push_back(message);
|
|
sendCondition.NotifyOne();
|
|
}
|
|
|
|
Awaitable<bool> WebSocketClient::Handshake() {
|
|
BASE_ASSERT(upgrade.IsWebsocketUpgrade(), "Arrived here without a valid WebSocket upgrade?");
|
|
|
|
// Disable the expiry timeout that the HTTP server placed on the original stream, since
|
|
// websocket::stream has its own functionality to time itself out.
|
|
stream.next_layer().expires_never();
|
|
|
|
stream.set_option(beast::websocket::stream_base::timeout::suggested(beast::role_type::server));
|
|
|
|
// decorate the handshake with our common server field(s)
|
|
stream.set_option(beast::websocket::stream_base::decorator([](beast::websocket::response_type& res) { SetCommonResponseFields(res); }));
|
|
|
|
// try accepting
|
|
try {
|
|
co_await stream.async_accept(upgrade.Native(), asio::deferred);
|
|
|
|
// Spawn coroutines for read/write end now that we actually performed the websokcet handshake
|
|
auto exec = stream.next_layer().get_executor();
|
|
asio::co_spawn(exec, WriteEnd(), asio::detached);
|
|
asio::co_spawn(exec, ReadEnd(), asio::detached);
|
|
co_return true;
|
|
} catch(bsys::system_error& ec) {
|
|
// just close the socket then.
|
|
bsys::error_code ignore;
|
|
stream.next_layer().socket().close(ignore);
|
|
co_return false;
|
|
}
|
|
co_return false;
|
|
}
|
|
|
|
Awaitable<void> WebSocketClient::Close() {
|
|
return CloseWithReason("Generic close.");
|
|
}
|
|
|
|
Awaitable<void> WebSocketClient::CloseWithReason(const std::string& reason) {
|
|
auto self = shared_from_this();
|
|
|
|
// don't try to close more than once
|
|
// (this avoids calling the close handler
|
|
// more than once, or trying to close the WebSocket stream
|
|
// multiple times, avoiding crashes.)
|
|
if(self->closed)
|
|
co_return;
|
|
|
|
self->closed = true;
|
|
|
|
if(self->listener)
|
|
co_await self->listener->OnClose();
|
|
|
|
try {
|
|
if(self->stream.next_layer().socket().is_open()) {
|
|
co_await self->stream.async_close(beast::websocket::close_reason { beast::websocket::close_code::try_again_later, reason },
|
|
asio::deferred);
|
|
}
|
|
} catch(bsys::error_code& ec) {
|
|
spdlog::error("Error when closing (not fatal): {}", ec.what());
|
|
}
|
|
|
|
// notify the send task that it needs to stop
|
|
self->sendQueue.clear();
|
|
self->sendCondition.NotifyAll();
|
|
co_return;
|
|
}
|
|
|
|
asio::ip::address WebSocketClient::Address() {
|
|
// N.B. The previous "get it from the socket" method would "work"
|
|
// but we don't support any sort of multipath, so the IP we get
|
|
// the upgrade request on (all things considered) will be the IP we read
|
|
|
|
#ifdef BASE_HTTP_REVERSE_PROXY_SUPPORT
|
|
return http::GetProxyAddress(upgrade.src, upgrade.Native());
|
|
#else
|
|
return upgrade.src;
|
|
#endif
|
|
}
|
|
|
|
Awaitable<void> WebSocketClient::WriteEnd() {
|
|
auto self = shared_from_this();
|
|
try {
|
|
while(true) {
|
|
// If the send queue was empty, then there isn't really anything we can do
|
|
if(self->sendQueue.empty()) {
|
|
self->sending = false;
|
|
self->sendCondition.NotifyOne();
|
|
co_await self->sendCondition.Wait([self]() {
|
|
if(self->closed)
|
|
return true;
|
|
return !self->sendQueue.empty();
|
|
});
|
|
}
|
|
|
|
if(closed)
|
|
break;
|
|
|
|
self->sending = true;
|
|
auto& message = self->sendQueue.front();
|
|
|
|
if(message->GetType() == WebSocketMessage::Type::Text) {
|
|
self->stream.text(true);
|
|
co_await self->stream.async_write(asio::buffer(message->AsText()), asio::deferred);
|
|
} else if(message->GetType() == WebSocketMessage::Type::Binary) {
|
|
self->stream.binary(true);
|
|
co_await self->stream.async_write(asio::buffer(message->AsBinary()), asio::deferred);
|
|
}
|
|
|
|
// pop the element off the queue
|
|
self->sendQueue.erase(sendQueue.begin());
|
|
self->sendQueue.shrink_to_fit();
|
|
}
|
|
} catch(bsys::system_error& ec) {
|
|
// spdlog::error("failure in write end: {}", ec.what());
|
|
}
|
|
|
|
co_await self->Close();
|
|
co_return;
|
|
}
|
|
|
|
Awaitable<void> WebSocketClient::ReadEnd() {
|
|
auto self = shared_from_this();
|
|
beast::flat_buffer messageBuffer;
|
|
|
|
try {
|
|
while(true) {
|
|
// wait for the send end to be done sending messages, if it decides to wake up
|
|
co_await self->sendCondition.Wait([self]() {
|
|
if(self->closed)
|
|
return true;
|
|
return !self->sending;
|
|
});
|
|
|
|
// If the connection was closed break out
|
|
if(self->closed)
|
|
break;
|
|
|
|
// let's try reading a message
|
|
auto b = co_await self->stream.async_read(messageBuffer, asio::deferred);
|
|
|
|
if(self->listener) {
|
|
if(self->stream.got_text()) {
|
|
co_await self->listener->OnMessage(std::make_shared<WebSocketMessage>(
|
|
std::string { reinterpret_cast<char*>(messageBuffer.data().data()), messageBuffer.size() }));
|
|
} else if(self->stream.got_binary()) {
|
|
co_await self->listener->OnMessage(std::make_shared<WebSocketMessage>(
|
|
std::span<u8> { reinterpret_cast<u8*>(messageBuffer.data().data()), messageBuffer.size() }));
|
|
}
|
|
}
|
|
|
|
messageBuffer.consume(b);
|
|
|
|
// notify the send end we're done reading, so it can (if the queue isn't empty) write
|
|
// this might end up blocking us from reading for a while if it decides it can.
|
|
self->sendCondition.NotifyOne();
|
|
}
|
|
} catch(bsys::system_error& ec) {
|
|
// TODO: this should be re-enabled but doesn't seem to limit properly
|
|
// if(ec.code() != beast::websocket::error::closed || ec.code() != asio::error::eof)
|
|
// spdlog::error("fail in read end {}", ec.what());
|
|
}
|
|
|
|
co_await self->Close();
|
|
co_return;
|
|
}
|
|
} // namespace base::http
|