aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorunderengineering <[email protected]>2024-05-10 14:32:50 +0300
committerGitHub <[email protected]>2024-05-10 12:32:50 +0100
commit37a84c5223279ce16152db69fba204df365c4eda (patch)
tree7eca797538998e0dcbcb0821deba9e23b100fbfb
parentc19903eaf8f3908e864e57c8449325e9b5e0c9bd (diff)
downloadHyprland-37a84c5223279ce16152db69fba204df365c4eda.tar.gz
Hyprland-37a84c5223279ce16152db69fba204df365c4eda.zip
socket2: fix events being reordered (#5955)
* socket2: fix events being reordered * remove WL_EVENT_READABLE * initialize eventSource in SClient * add more logs oopsie * replace unordered_map with vector * fix reordering when socket becomes writable before queue is flushed * ignore EAGAIN when accepting connection * use g_pEventManager
-rw-r--r--src/Compositor.cpp4
-rw-r--r--src/managers/EventManager.cpp210
-rw-r--r--src/managers/EventManager.hpp37
3 files changed, 138 insertions, 113 deletions
diff --git a/src/Compositor.cpp b/src/Compositor.cpp
index bed2b203..9a9ea30b 100644
--- a/src/Compositor.cpp
+++ b/src/Compositor.cpp
@@ -338,9 +338,6 @@ void CCompositor::cleanup() {
m_pLastFocus = nullptr;
m_pLastWindow.reset();
- // end threads
- g_pEventManager->m_tThread = std::thread();
-
m_vWorkspaces.clear();
m_vWindows.clear();
@@ -463,7 +460,6 @@ void CCompositor::initManagers(eManagersInitStage stage) {
Debug::log(LOG, "Creating the EventManager!");
g_pEventManager = std::make_unique<CEventManager>();
- g_pEventManager->startThread();
Debug::log(LOG, "Creating the HyprDebugOverlay!");
g_pDebugOverlay = std::make_unique<CHyprDebugOverlay>();
diff --git a/src/managers/EventManager.cpp b/src/managers/EventManager.cpp
index 8605768c..75c98e2a 100644
--- a/src/managers/EventManager.cpp
+++ b/src/managers/EventManager.cpp
@@ -1,154 +1,178 @@
#include "EventManager.hpp"
#include "../Compositor.hpp"
-#include <errno.h>
-#include <fcntl.h>
+#include <algorithm>
#include <netinet/in.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
-#include <sys/ioctl.h>
-
-#include <string>
-#include <algorithm>
-
-CEventManager::CEventManager() {}
-
-int fdHandleWrite(int fd, uint32_t mask, void* data) {
- const auto PEVMGR = (CEventManager*)data;
- return PEVMGR->onFDWrite(fd, mask);
-}
-
-int socket2HandleWrite(int fd, uint32_t mask, void* data) {
- const auto PEVMGR = (CEventManager*)data;
- return PEVMGR->onSocket2Write(fd, mask);
-}
-
-void CEventManager::startThread() {
-
- m_iSocketFD = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
+CEventManager::CEventManager() {
+ m_iSocketFD = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0);
if (m_iSocketFD < 0) {
Debug::log(ERR, "Couldn't start the Hyprland Socket 2. (1) IPC will not work.");
return;
}
sockaddr_un SERVERADDRESS = {.sun_family = AF_UNIX};
- std::string socketPath = g_pCompositor->m_szInstancePath + "/.socket2.sock";
- strncpy(SERVERADDRESS.sun_path, socketPath.c_str(), sizeof(SERVERADDRESS.sun_path) - 1);
+ const auto PATH = g_pCompositor->m_szInstancePath + "/.socket2.sock";
+ if (PATH.length() > sizeof(SERVERADDRESS.sun_path) - 1) {
+ Debug::log(ERR, "Socket2 path is too long. (2) IPC will not work.");
+ return;
+ }
- bind(m_iSocketFD, (sockaddr*)&SERVERADDRESS, SUN_LEN(&SERVERADDRESS));
+ strncpy(SERVERADDRESS.sun_path, PATH.c_str(), sizeof(SERVERADDRESS.sun_path) - 1);
+
+ if (bind(m_iSocketFD, (sockaddr*)&SERVERADDRESS, SUN_LEN(&SERVERADDRESS)) < 0) {
+ Debug::log(ERR, "Couldn't bind the Hyprland Socket 2. (3) IPC will not work.");
+ return;
+ }
// 10 max queued.
- listen(m_iSocketFD, 10);
+ if (listen(m_iSocketFD, 10) < 0) {
+ Debug::log(ERR, "Couldn't listen on the Hyprland Socket 2. (4) IPC will not work.");
+ return;
+ }
- m_pEventSource = wl_event_loop_add_fd(g_pCompositor->m_sWLEventLoop, m_iSocketFD, WL_EVENT_READABLE, socket2HandleWrite, this);
+ m_pEventSource = wl_event_loop_add_fd(g_pCompositor->m_sWLEventLoop, m_iSocketFD, WL_EVENT_READABLE, onClientEvent, nullptr);
}
-int CEventManager::onSocket2Write(int fd, uint32_t mask) {
+CEventManager::~CEventManager() {
+ for (const auto& client : m_vClients) {
+ wl_event_source_remove(client.eventSource);
+ close(client.fd);
+ }
+ if (m_pEventSource != nullptr)
+ wl_event_source_remove(m_pEventSource);
+
+ if (m_iSocketFD >= 0)
+ close(m_iSocketFD);
+}
+
+int CEventManager::onServerEvent(int fd, uint32_t mask, void* data) {
+ return g_pEventManager->onClientEvent(fd, mask);
+}
+
+int CEventManager::onClientEvent(int fd, uint32_t mask, void* data) {
+ return g_pEventManager->onServerEvent(fd, mask);
+}
+
+int CEventManager::onServerEvent(int fd, uint32_t mask) {
if (mask & WL_EVENT_ERROR || mask & WL_EVENT_HANGUP) {
Debug::log(ERR, "Socket2 hangup?? IPC broke");
+
wl_event_source_remove(m_pEventSource);
+ m_pEventSource = nullptr;
close(fd);
+ m_iSocketFD = -1;
+
return 0;
}
sockaddr_in clientAddress;
socklen_t clientSize = sizeof(clientAddress);
const auto ACCEPTEDCONNECTION = accept4(m_iSocketFD, (sockaddr*)&clientAddress, &clientSize, SOCK_CLOEXEC | SOCK_NONBLOCK);
+ if (ACCEPTEDCONNECTION < 0) {
+ if (errno != EAGAIN) {
+ Debug::log(ERR, "Socket2 failed receiving connection, errno: {}", errno);
+ wl_event_source_remove(m_pEventSource);
+ m_pEventSource = nullptr;
+ close(fd);
+ m_iSocketFD = -1;
+ }
- if (ACCEPTEDCONNECTION > 0) {
- Debug::log(LOG, "Socket2 accepted a new client at FD {}", ACCEPTEDCONNECTION);
-
- // add to event loop so we can close it when we need to
- m_dAcceptedSocketFDs.push_back(
- std::make_pair<>(ACCEPTEDCONNECTION, wl_event_loop_add_fd(g_pCompositor->m_sWLEventLoop, ACCEPTEDCONNECTION, WL_EVENT_READABLE, fdHandleWrite, this)));
- } else {
- Debug::log(ERR, "Socket2 failed receiving connection, errno: {}", errno);
- close(fd);
+ return 0;
}
- return 0;
-}
+ Debug::log(LOG, "Socket2 accepted a new client at FD {}", ACCEPTEDCONNECTION);
-int CEventManager::onFDWrite(int fd, uint32_t mask) {
- auto removeFD = [this](int fd) -> void {
- for (auto it = m_dAcceptedSocketFDs.begin(); it != m_dAcceptedSocketFDs.end();) {
- if (it->first == fd) {
- wl_event_source_remove(it->second); // remove this fd listener
- it = m_dAcceptedSocketFDs.erase(it);
- } else {
- it++;
- }
- }
+ // add to event loop so we can close it when we need to
+ auto* eventSource = wl_event_loop_add_fd(g_pCompositor->m_sWLEventLoop, ACCEPTEDCONNECTION, 0, onServerEvent, nullptr);
+ m_vClients.emplace_back(SClient{
+ ACCEPTEDCONNECTION,
+ {},
+ eventSource,
+ });
- close(fd);
- };
+ return 0;
+}
+int CEventManager::onClientEvent(int fd, uint32_t mask) {
if (mask & WL_EVENT_ERROR || mask & WL_EVENT_HANGUP) {
- // remove, hanged up
- removeFD(fd);
+ Debug::log(LOG, "Socket2 fd {} hung up", fd);
+ removeClientByFD(fd);
return 0;
}
- int availableBytes;
- if (ioctl(fd, FIONREAD, &availableBytes) == -1) {
- Debug::log(ERR, "fd {} sent invalid data (1)", fd);
- removeFD(fd);
- return 0;
- }
+ if (mask & WL_EVENT_WRITABLE) {
+ const auto CLIENTIT = findClientByFD(fd);
- char buf[availableBytes];
- const auto RECEIVED = recv(fd, buf, availableBytes, 0);
- if (RECEIVED == -1) {
- Debug::log(ERR, "fd {} sent invalid data (2)", fd);
- removeFD(fd);
- return 0;
+ // send all queued events
+ while (!CLIENTIT->events.empty()) {
+ const auto& event = CLIENTIT->events.front();
+ if (write(CLIENTIT->fd, event->c_str(), event->length()) < 0)
+ break;
+
+ CLIENTIT->events.pop_front();
+ }
+
+ // stop polling when we sent all events
+ if (CLIENTIT->events.empty())
+ wl_event_source_fd_update(CLIENTIT->eventSource, 0);
}
return 0;
}
-void CEventManager::flushEvents() {
- eventQueueMutex.lock();
-
- for (auto& ev : m_dQueuedEvents) {
- std::string eventString = (ev.event + ">>" + ev.data).substr(0, 1022) + "\n";
- for (auto& fd : m_dAcceptedSocketFDs) {
- try {
- write(fd.first, eventString.c_str(), eventString.length());
- } catch (...) {}
- }
- }
+std::vector<CEventManager::SClient>::iterator CEventManager::findClientByFD(int fd) {
+ return std::find_if(m_vClients.begin(), m_vClients.end(), [fd](const auto& client) { return client.fd == fd; });
+}
- m_dQueuedEvents.clear();
+std::vector<CEventManager::SClient>::iterator CEventManager::removeClientByFD(int fd) {
+ const auto CLIENTIT = findClientByFD(fd);
+ wl_event_source_remove(CLIENTIT->eventSource);
+ close(fd);
- eventQueueMutex.unlock();
+ return m_vClients.erase(CLIENTIT);
}
-void CEventManager::postEvent(const SHyprIPCEvent event) {
+std::string CEventManager::formatEvent(const SHyprIPCEvent& event) const {
+ std::string_view data = event.data;
+ auto eventString = std::format("{}>>{}\n", event.event, data.substr(0, 1024));
+ std::replace(eventString.begin() + event.event.length() + 2, eventString.end() - 1, '\n', ' ');
+ return eventString;
+}
+void CEventManager::postEvent(const SHyprIPCEvent& event) {
if (g_pCompositor->m_bIsShuttingDown) {
- Debug::log(WARN, "Suppressed (ignoreevents true / shutting down) event of type {}, content: {}", event.event, event.data);
+ Debug::log(WARN, "Suppressed (shutting down) event of type {}, content: {}", event.event, event.data);
return;
}
- std::thread(
- [this](SHyprIPCEvent ev) {
- std::replace(ev.data.begin(), ev.data.end(), '\n', ' ');
+ const size_t MAX_QUEUED_EVENTS = 64;
+ auto sharedEvent = makeShared<std::string>(formatEvent(event));
+ for (auto it = m_vClients.begin(); it != m_vClients.end();) {
+ // try to send the event immediately if the queue is empty
+ const auto QUEUESIZE = it->events.size();
+ if (QUEUESIZE > 0 || write(it->fd, sharedEvent->c_str(), sharedEvent->length()) < 0) {
+ if (QUEUESIZE >= MAX_QUEUED_EVENTS) {
+ // too many events queued, remove the client
+ Debug::log(ERR, "Socket2 fd {} overflowed event queue, removing", it->fd);
+ it = removeClientByFD(it->fd);
+ continue;
+ }
+
+ // queue it to send later if failed
+ it->events.push_back(sharedEvent);
- eventQueueMutex.lock();
- m_dQueuedEvents.push_back(ev);
- eventQueueMutex.unlock();
+ // poll for write if queue was empty
+ if (QUEUESIZE == 0)
+ wl_event_source_fd_update(it->eventSource, WL_EVENT_WRITABLE);
+ }
- flushEvents();
- },
- event)
- .detach();
+ ++it;
+ }
}
diff --git a/src/managers/EventManager.hpp b/src/managers/EventManager.hpp
index ed681dba..94dbab59 100644
--- a/src/managers/EventManager.hpp
+++ b/src/managers/EventManager.hpp
@@ -1,10 +1,9 @@
#pragma once
#include <deque>
-#include <fstream>
-#include <mutex>
+#include <vector>
#include "../defines.hpp"
-#include "../helpers/MiscFunctions.hpp"
+#include "../helpers/memory/SharedPtr.hpp"
struct SHyprIPCEvent {
std::string event;
@@ -14,27 +13,33 @@ struct SHyprIPCEvent {
class CEventManager {
public:
CEventManager();
+ ~CEventManager();
- void postEvent(const SHyprIPCEvent event);
+ void postEvent(const SHyprIPCEvent& event);
- void startThread();
-
- std::thread m_tThread;
+ private:
+ std::string formatEvent(const SHyprIPCEvent& event) const;
- int m_iSocketFD = -1;
+ static int onServerEvent(int fd, uint32_t mask, void* data);
+ static int onClientEvent(int fd, uint32_t mask, void* data);
- int onSocket2Write(int fd, uint32_t mask);
- int onFDWrite(int fd, uint32_t mask);
+ int onServerEvent(int fd, uint32_t mask);
+ int onClientEvent(int fd, uint32_t mask);
- private:
- void flushEvents();
+ struct SClient {
+ int fd = -1;
+ std::deque<SP<std::string>> events;
+ wl_event_source* eventSource = nullptr;
+ };
- std::mutex eventQueueMutex;
- std::deque<SHyprIPCEvent> m_dQueuedEvents;
+ std::vector<SClient>::iterator findClientByFD(int fd);
+ std::vector<SClient>::iterator removeClientByFD(int fd);
- std::deque<std::pair<int, wl_event_source*>> m_dAcceptedSocketFDs;
+ private:
+ int m_iSocketFD = -1;
+ wl_event_source* m_pEventSource = nullptr;
- wl_event_source* m_pEventSource = nullptr;
+ std::vector<SClient> m_vClients;
};
inline std::unique_ptr<CEventManager> g_pEventManager;