#include #include #include #include #include #include namespace base::http { void WebSocketClient::Send(ConstMessagePtr message) { BASE_ASSERT(message.get() != nullptr, "This function needs a valid message pointer."); if(closed) return; // If the backpressure is too large for this client // then just disconnect it. It's not worth trying to catch up, // especially if this client is actually holding references to messages // and causing what effectively amounts to a memory leak! if(sendQueue.size() > MAX_MESSAGES_IN_QUEUE) { if(!closed && !forceClosed) { forceClosed = true; sendQueue.clear(); asio::co_spawn(stream.get_executor(), Close(), asio::detached); } return; } sendQueue.push_back(message); sendCondition.NotifyOne(); } Awaitable WebSocketClient::Handshake() { BASE_ASSERT(upgrade.IsWebsocketUpgrade(), "Arrived here without a valid WebSocket upgrade?"); // Disable the expiry timeout that the HTTP server placed on the original stream, since // websocket::stream has its own functionality to time itself out. stream.next_layer().expires_never(); stream.set_option(beast::websocket::stream_base::timeout::suggested(beast::role_type::server)); // decorate the handshake with our common server field(s) stream.set_option(beast::websocket::stream_base::decorator([](beast::websocket::response_type& res) { SetCommonResponseFields(res); })); // try accepting try { co_await stream.async_accept(upgrade.Native(), asio::deferred); // Spawn coroutines for read/write end now that we actually performed the websokcet handshake auto exec = stream.next_layer().get_executor(); asio::co_spawn(exec, WriteEnd(), asio::detached); asio::co_spawn(exec, ReadEnd(), asio::detached); co_return true; } catch(bsys::system_error& ec) { // just close the socket then. bsys::error_code ignore; stream.next_layer().socket().close(ignore); co_return false; } co_return false; } Awaitable WebSocketClient::Close() { return CloseWithReason("Generic close."); } Awaitable WebSocketClient::CloseWithReason(const std::string& reason) { auto self = shared_from_this(); // don't try to close more than once // (this avoids calling the close handler // more than once, or trying to close the WebSocket stream // multiple times, avoiding crashes.) if(self->closed) co_return; self->closed = true; if(self->listener) co_await self->listener->OnClose(); try { if(self->stream.next_layer().socket().is_open()) { co_await self->stream.async_close(beast::websocket::close_reason { beast::websocket::close_code::try_again_later, reason }, asio::deferred); } } catch(bsys::error_code& ec) { spdlog::error("Error when closing (not fatal): {}", ec.what()); } // notify the send task that it needs to stop self->sendQueue.clear(); self->sendCondition.NotifyAll(); co_return; } asio::ip::address WebSocketClient::Address() { // N.B. The previous "get it from the socket" method would "work" // but we don't support any sort of multipath, so the IP we get // the upgrade request on (all things considered) will be the IP we read #ifdef BASE_HTTP_REVERSE_PROXY_SUPPORT return http::GetProxyAddress(upgrade.src, upgrade.Native()); #else return upgrade.src; #endif } Awaitable WebSocketClient::WriteEnd() { auto self = shared_from_this(); try { while(true) { // If the send queue was empty, then there isn't really anything we can do if(self->sendQueue.empty()) { self->sending = false; self->sendCondition.NotifyOne(); co_await self->sendCondition.Wait([self]() { if(self->closed) return true; return !self->sendQueue.empty(); }); } if(closed) break; self->sending = true; auto& message = self->sendQueue.front(); if(message->GetType() == WebSocketMessage::Type::Text) { self->stream.text(true); co_await self->stream.async_write(asio::buffer(message->AsText()), asio::deferred); } else if(message->GetType() == WebSocketMessage::Type::Binary) { self->stream.binary(true); co_await self->stream.async_write(asio::buffer(message->AsBinary()), asio::deferred); } // pop the element off the queue self->sendQueue.erase(sendQueue.begin()); self->sendQueue.shrink_to_fit(); } } catch(bsys::system_error& ec) { // spdlog::error("failure in write end: {}", ec.what()); } co_await self->Close(); co_return; } Awaitable WebSocketClient::ReadEnd() { auto self = shared_from_this(); beast::flat_buffer messageBuffer; try { while(true) { // wait for the send end to be done sending messages, if it decides to wake up co_await self->sendCondition.Wait([self]() { if(self->closed) return true; return !self->sending; }); // If the connection was closed break out if(self->closed) break; // let's try reading a message auto b = co_await self->stream.async_read(messageBuffer, asio::deferred); if(self->listener) { if(self->stream.got_text()) { co_await self->listener->OnMessage(std::make_shared( std::string { reinterpret_cast(messageBuffer.data().data()), messageBuffer.size() })); } else if(self->stream.got_binary()) { co_await self->listener->OnMessage(std::make_shared( std::span { reinterpret_cast(messageBuffer.data().data()), messageBuffer.size() })); } } messageBuffer.consume(b); // notify the send end we're done reading, so it can (if the queue isn't empty) write // this might end up blocking us from reading for a while if it decides it can. self->sendCondition.NotifyOne(); } } catch(bsys::system_error& ec) { // TODO: this should be re-enabled but doesn't seem to limit properly // if(ec.code() != beast::websocket::error::closed || ec.code() != asio::error::eof) // spdlog::error("fail in read end {}", ec.what()); } co_await self->Close(); co_return; } } // namespace base::http