diff options
author | underengineering <[email protected]> | 2024-05-10 14:32:50 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2024-05-10 12:32:50 +0100 |
commit | 37a84c5223279ce16152db69fba204df365c4eda (patch) | |
tree | 7eca797538998e0dcbcb0821deba9e23b100fbfb | |
parent | c19903eaf8f3908e864e57c8449325e9b5e0c9bd (diff) | |
download | Hyprland-37a84c5223279ce16152db69fba204df365c4eda.tar.gz Hyprland-37a84c5223279ce16152db69fba204df365c4eda.zip |
socket2: fix events being reordered (#5955)
* socket2: fix events being reordered
* remove WL_EVENT_READABLE
* initialize eventSource in SClient
* add more logs
oopsie
* replace unordered_map with vector
* fix reordering when socket becomes writable before queue is flushed
* ignore EAGAIN when accepting connection
* use g_pEventManager
-rw-r--r-- | src/Compositor.cpp | 4 | ||||
-rw-r--r-- | src/managers/EventManager.cpp | 210 | ||||
-rw-r--r-- | src/managers/EventManager.hpp | 37 |
3 files changed, 138 insertions, 113 deletions
diff --git a/src/Compositor.cpp b/src/Compositor.cpp index bed2b203..9a9ea30b 100644 --- a/src/Compositor.cpp +++ b/src/Compositor.cpp @@ -338,9 +338,6 @@ void CCompositor::cleanup() { m_pLastFocus = nullptr; m_pLastWindow.reset(); - // end threads - g_pEventManager->m_tThread = std::thread(); - m_vWorkspaces.clear(); m_vWindows.clear(); @@ -463,7 +460,6 @@ void CCompositor::initManagers(eManagersInitStage stage) { Debug::log(LOG, "Creating the EventManager!"); g_pEventManager = std::make_unique<CEventManager>(); - g_pEventManager->startThread(); Debug::log(LOG, "Creating the HyprDebugOverlay!"); g_pDebugOverlay = std::make_unique<CHyprDebugOverlay>(); diff --git a/src/managers/EventManager.cpp b/src/managers/EventManager.cpp index 8605768c..75c98e2a 100644 --- a/src/managers/EventManager.cpp +++ b/src/managers/EventManager.cpp @@ -1,154 +1,178 @@ #include "EventManager.hpp" #include "../Compositor.hpp" -#include <errno.h> -#include <fcntl.h> +#include <algorithm> #include <netinet/in.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> #include <sys/socket.h> #include <sys/stat.h> #include <sys/types.h> #include <sys/un.h> #include <unistd.h> -#include <sys/ioctl.h> - -#include <string> -#include <algorithm> - -CEventManager::CEventManager() {} - -int fdHandleWrite(int fd, uint32_t mask, void* data) { - const auto PEVMGR = (CEventManager*)data; - return PEVMGR->onFDWrite(fd, mask); -} - -int socket2HandleWrite(int fd, uint32_t mask, void* data) { - const auto PEVMGR = (CEventManager*)data; - return PEVMGR->onSocket2Write(fd, mask); -} - -void CEventManager::startThread() { - - m_iSocketFD = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0); +CEventManager::CEventManager() { + m_iSocketFD = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0); if (m_iSocketFD < 0) { Debug::log(ERR, "Couldn't start the Hyprland Socket 2. (1) IPC will not work."); return; } sockaddr_un SERVERADDRESS = {.sun_family = AF_UNIX}; - std::string socketPath = g_pCompositor->m_szInstancePath + "/.socket2.sock"; - strncpy(SERVERADDRESS.sun_path, socketPath.c_str(), sizeof(SERVERADDRESS.sun_path) - 1); + const auto PATH = g_pCompositor->m_szInstancePath + "/.socket2.sock"; + if (PATH.length() > sizeof(SERVERADDRESS.sun_path) - 1) { + Debug::log(ERR, "Socket2 path is too long. (2) IPC will not work."); + return; + } - bind(m_iSocketFD, (sockaddr*)&SERVERADDRESS, SUN_LEN(&SERVERADDRESS)); + strncpy(SERVERADDRESS.sun_path, PATH.c_str(), sizeof(SERVERADDRESS.sun_path) - 1); + + if (bind(m_iSocketFD, (sockaddr*)&SERVERADDRESS, SUN_LEN(&SERVERADDRESS)) < 0) { + Debug::log(ERR, "Couldn't bind the Hyprland Socket 2. (3) IPC will not work."); + return; + } // 10 max queued. - listen(m_iSocketFD, 10); + if (listen(m_iSocketFD, 10) < 0) { + Debug::log(ERR, "Couldn't listen on the Hyprland Socket 2. (4) IPC will not work."); + return; + } - m_pEventSource = wl_event_loop_add_fd(g_pCompositor->m_sWLEventLoop, m_iSocketFD, WL_EVENT_READABLE, socket2HandleWrite, this); + m_pEventSource = wl_event_loop_add_fd(g_pCompositor->m_sWLEventLoop, m_iSocketFD, WL_EVENT_READABLE, onClientEvent, nullptr); } -int CEventManager::onSocket2Write(int fd, uint32_t mask) { +CEventManager::~CEventManager() { + for (const auto& client : m_vClients) { + wl_event_source_remove(client.eventSource); + close(client.fd); + } + if (m_pEventSource != nullptr) + wl_event_source_remove(m_pEventSource); + + if (m_iSocketFD >= 0) + close(m_iSocketFD); +} + +int CEventManager::onServerEvent(int fd, uint32_t mask, void* data) { + return g_pEventManager->onClientEvent(fd, mask); +} + +int CEventManager::onClientEvent(int fd, uint32_t mask, void* data) { + return g_pEventManager->onServerEvent(fd, mask); +} + +int CEventManager::onServerEvent(int fd, uint32_t mask) { if (mask & WL_EVENT_ERROR || mask & WL_EVENT_HANGUP) { Debug::log(ERR, "Socket2 hangup?? IPC broke"); + wl_event_source_remove(m_pEventSource); + m_pEventSource = nullptr; close(fd); + m_iSocketFD = -1; + return 0; } sockaddr_in clientAddress; socklen_t clientSize = sizeof(clientAddress); const auto ACCEPTEDCONNECTION = accept4(m_iSocketFD, (sockaddr*)&clientAddress, &clientSize, SOCK_CLOEXEC | SOCK_NONBLOCK); + if (ACCEPTEDCONNECTION < 0) { + if (errno != EAGAIN) { + Debug::log(ERR, "Socket2 failed receiving connection, errno: {}", errno); + wl_event_source_remove(m_pEventSource); + m_pEventSource = nullptr; + close(fd); + m_iSocketFD = -1; + } - if (ACCEPTEDCONNECTION > 0) { - Debug::log(LOG, "Socket2 accepted a new client at FD {}", ACCEPTEDCONNECTION); - - // add to event loop so we can close it when we need to - m_dAcceptedSocketFDs.push_back( - std::make_pair<>(ACCEPTEDCONNECTION, wl_event_loop_add_fd(g_pCompositor->m_sWLEventLoop, ACCEPTEDCONNECTION, WL_EVENT_READABLE, fdHandleWrite, this))); - } else { - Debug::log(ERR, "Socket2 failed receiving connection, errno: {}", errno); - close(fd); + return 0; } - return 0; -} + Debug::log(LOG, "Socket2 accepted a new client at FD {}", ACCEPTEDCONNECTION); -int CEventManager::onFDWrite(int fd, uint32_t mask) { - auto removeFD = [this](int fd) -> void { - for (auto it = m_dAcceptedSocketFDs.begin(); it != m_dAcceptedSocketFDs.end();) { - if (it->first == fd) { - wl_event_source_remove(it->second); // remove this fd listener - it = m_dAcceptedSocketFDs.erase(it); - } else { - it++; - } - } + // add to event loop so we can close it when we need to + auto* eventSource = wl_event_loop_add_fd(g_pCompositor->m_sWLEventLoop, ACCEPTEDCONNECTION, 0, onServerEvent, nullptr); + m_vClients.emplace_back(SClient{ + ACCEPTEDCONNECTION, + {}, + eventSource, + }); - close(fd); - }; + return 0; +} +int CEventManager::onClientEvent(int fd, uint32_t mask) { if (mask & WL_EVENT_ERROR || mask & WL_EVENT_HANGUP) { - // remove, hanged up - removeFD(fd); + Debug::log(LOG, "Socket2 fd {} hung up", fd); + removeClientByFD(fd); return 0; } - int availableBytes; - if (ioctl(fd, FIONREAD, &availableBytes) == -1) { - Debug::log(ERR, "fd {} sent invalid data (1)", fd); - removeFD(fd); - return 0; - } + if (mask & WL_EVENT_WRITABLE) { + const auto CLIENTIT = findClientByFD(fd); - char buf[availableBytes]; - const auto RECEIVED = recv(fd, buf, availableBytes, 0); - if (RECEIVED == -1) { - Debug::log(ERR, "fd {} sent invalid data (2)", fd); - removeFD(fd); - return 0; + // send all queued events + while (!CLIENTIT->events.empty()) { + const auto& event = CLIENTIT->events.front(); + if (write(CLIENTIT->fd, event->c_str(), event->length()) < 0) + break; + + CLIENTIT->events.pop_front(); + } + + // stop polling when we sent all events + if (CLIENTIT->events.empty()) + wl_event_source_fd_update(CLIENTIT->eventSource, 0); } return 0; } -void CEventManager::flushEvents() { - eventQueueMutex.lock(); - - for (auto& ev : m_dQueuedEvents) { - std::string eventString = (ev.event + ">>" + ev.data).substr(0, 1022) + "\n"; - for (auto& fd : m_dAcceptedSocketFDs) { - try { - write(fd.first, eventString.c_str(), eventString.length()); - } catch (...) {} - } - } +std::vector<CEventManager::SClient>::iterator CEventManager::findClientByFD(int fd) { + return std::find_if(m_vClients.begin(), m_vClients.end(), [fd](const auto& client) { return client.fd == fd; }); +} - m_dQueuedEvents.clear(); +std::vector<CEventManager::SClient>::iterator CEventManager::removeClientByFD(int fd) { + const auto CLIENTIT = findClientByFD(fd); + wl_event_source_remove(CLIENTIT->eventSource); + close(fd); - eventQueueMutex.unlock(); + return m_vClients.erase(CLIENTIT); } -void CEventManager::postEvent(const SHyprIPCEvent event) { +std::string CEventManager::formatEvent(const SHyprIPCEvent& event) const { + std::string_view data = event.data; + auto eventString = std::format("{}>>{}\n", event.event, data.substr(0, 1024)); + std::replace(eventString.begin() + event.event.length() + 2, eventString.end() - 1, '\n', ' '); + return eventString; +} +void CEventManager::postEvent(const SHyprIPCEvent& event) { if (g_pCompositor->m_bIsShuttingDown) { - Debug::log(WARN, "Suppressed (ignoreevents true / shutting down) event of type {}, content: {}", event.event, event.data); + Debug::log(WARN, "Suppressed (shutting down) event of type {}, content: {}", event.event, event.data); return; } - std::thread( - [this](SHyprIPCEvent ev) { - std::replace(ev.data.begin(), ev.data.end(), '\n', ' '); + const size_t MAX_QUEUED_EVENTS = 64; + auto sharedEvent = makeShared<std::string>(formatEvent(event)); + for (auto it = m_vClients.begin(); it != m_vClients.end();) { + // try to send the event immediately if the queue is empty + const auto QUEUESIZE = it->events.size(); + if (QUEUESIZE > 0 || write(it->fd, sharedEvent->c_str(), sharedEvent->length()) < 0) { + if (QUEUESIZE >= MAX_QUEUED_EVENTS) { + // too many events queued, remove the client + Debug::log(ERR, "Socket2 fd {} overflowed event queue, removing", it->fd); + it = removeClientByFD(it->fd); + continue; + } + + // queue it to send later if failed + it->events.push_back(sharedEvent); - eventQueueMutex.lock(); - m_dQueuedEvents.push_back(ev); - eventQueueMutex.unlock(); + // poll for write if queue was empty + if (QUEUESIZE == 0) + wl_event_source_fd_update(it->eventSource, WL_EVENT_WRITABLE); + } - flushEvents(); - }, - event) - .detach(); + ++it; + } } diff --git a/src/managers/EventManager.hpp b/src/managers/EventManager.hpp index ed681dba..94dbab59 100644 --- a/src/managers/EventManager.hpp +++ b/src/managers/EventManager.hpp @@ -1,10 +1,9 @@ #pragma once #include <deque> -#include <fstream> -#include <mutex> +#include <vector> #include "../defines.hpp" -#include "../helpers/MiscFunctions.hpp" +#include "../helpers/memory/SharedPtr.hpp" struct SHyprIPCEvent { std::string event; @@ -14,27 +13,33 @@ struct SHyprIPCEvent { class CEventManager { public: CEventManager(); + ~CEventManager(); - void postEvent(const SHyprIPCEvent event); + void postEvent(const SHyprIPCEvent& event); - void startThread(); - - std::thread m_tThread; + private: + std::string formatEvent(const SHyprIPCEvent& event) const; - int m_iSocketFD = -1; + static int onServerEvent(int fd, uint32_t mask, void* data); + static int onClientEvent(int fd, uint32_t mask, void* data); - int onSocket2Write(int fd, uint32_t mask); - int onFDWrite(int fd, uint32_t mask); + int onServerEvent(int fd, uint32_t mask); + int onClientEvent(int fd, uint32_t mask); - private: - void flushEvents(); + struct SClient { + int fd = -1; + std::deque<SP<std::string>> events; + wl_event_source* eventSource = nullptr; + }; - std::mutex eventQueueMutex; - std::deque<SHyprIPCEvent> m_dQueuedEvents; + std::vector<SClient>::iterator findClientByFD(int fd); + std::vector<SClient>::iterator removeClientByFD(int fd); - std::deque<std::pair<int, wl_event_source*>> m_dAcceptedSocketFDs; + private: + int m_iSocketFD = -1; + wl_event_source* m_pEventSource = nullptr; - wl_event_source* m_pEventSource = nullptr; + std::vector<SClient> m_vClients; }; inline std::unique_ptr<CEventManager> g_pEventManager; |