diff --git a/src/engine/helper/HL_Process.cpp b/src/engine/helper/HL_Process.cpp index ddfe85c4..01bd4ab0 100644 --- a/src/engine/helper/HL_Process.cpp +++ b/src/engine/helper/HL_Process.cpp @@ -82,6 +82,7 @@ std::string OsProcess::execCommand(const std::string& cmd) #include #include #include +#include std::string OsProcess::execCommand(const std::string& cmd) { @@ -102,6 +103,11 @@ std::string OsProcess::execCommand(const std::string& cmd) } #include +#include +#include +#include +#include +#include "helper/HL_String.h" std::shared_ptr OsProcess::asyncExecCommand(const std::string& cmdline, std::function callback, @@ -115,12 +121,17 @@ std::shared_ptr OsProcess::asyncExecCommand(const std::string& cmdl throw std::runtime_error("Failed to run."); char buffer[1024]; + std::string lines; std::string result = ""; - int f = fileno(pipe); + int fno = fileno(pipe); + + // Make it non blocking + fcntl(fno, F_SETFL, O_NONBLOCK); + while (!feof(pipe) && !finish_flag) { // Wait for more data - struct pollfd pfd{ .fd = f, .events = POLLIN }; + struct pollfd pfd{ .fd = fno, .events = POLLIN }; while (poll(&pfd, 1, 0) == 0 && !finish_flag) ; @@ -129,11 +140,32 @@ std::shared_ptr OsProcess::asyncExecCommand(const std::string& cmdl if (finish_flag) continue; - if (fgets(buffer, 1024, pipe) != nullptr) + int r; + do { - if (callback) - callback(buffer); - result += buffer; + r = read(fno, buffer, sizeof(buffer)-1); + if (r > 0) + { + buffer[r] = 0; + lines += std::string(buffer); + } + } + while (r == sizeof(buffer) - 1); + + if (lines.find("\n") != std::string::npos && callback) + { + std::string::size_type p = 0; + while (p < lines.size()) + { + std::string::size_type d = lines.find("\n", p); + if (d != std::string::npos) + { + callback(StringHelper::trim(lines.substr(p, d-p))); + p = d + 1; + } + } + + lines.erase(0, p); } } diff --git a/src/engine/helper/HL_String.cpp b/src/engine/helper/HL_String.cpp index c0d819f6..96152d5c 100644 --- a/src/engine/helper/HL_String.cpp +++ b/src/engine/helper/HL_String.cpp @@ -27,6 +27,20 @@ std::string StringHelper::extractFilename(const std::string& path) } +std::string StringHelper::appendPath(const std::string& s1, const std::string& s2) +{ + std::string result = s1; + if (!endsWith(result, "/") && !endsWith(result, "\\")) + { +#if defined(TARGET_WIN) + result += "\\"; +#else + result += "/"; +#endif + } + return result + s2; +} + std::string StringHelper::makeUtf8(const std::tstring &arg) { #if defined(TARGET_WIN) @@ -354,6 +368,26 @@ bool StringHelper::startsWith(const std::string& s, const std::string& prefix) return p == 0; } +bool StringHelper::endsWith(const std::string& s, const std::string& suffix) +{ + std::string::size_type p = s.rfind(suffix); + return (p == s.size() - suffix.size()); +} + +int StringHelper::stringToDuration(const std::string& s) +{ + if (endsWith(s, "ms")) + return std::stoi(s.substr(0, s.size()-2)); + if (endsWith(s, "s")) + return std::stoi(s.substr(0, s.size()-1)) * 1000; + if (endsWith(s, "m")) + return std::stoi(s.substr(0, s.size()-1)) * 60000; + if (endsWith(s, "h")) + return std::stoi(s.substr(0, s.size()-1)) * 3600 * 1000; + else + return std::stoi(s) * 1000; +} + #define XML_HEADER "" // --------------------- XcapHelper ----------------- std::string XcapHelper::buildBuddyList(std::string listName, std::vector buddies) diff --git a/src/engine/helper/HL_String.h b/src/engine/helper/HL_String.h index f6f39b77..9ed40211 100644 --- a/src/engine/helper/HL_String.h +++ b/src/engine/helper/HL_String.h @@ -20,6 +20,8 @@ class StringHelper { public: static std::string extractFilename(const std::string& path); + static std::string appendPath(const std::string& s1, const std::string& s2); + static std::string makeUtf8(const std::tstring& arg); static std::tstring makeTstring(const std::string& arg); static int toInt(const char* s, int defaultValue, bool* isOk = nullptr); @@ -59,6 +61,8 @@ public: static std::string replace(const std::string& s, const std::string& tmpl, const std::string& n); static std::string decodeUri(const std::string& s); static bool startsWith(const std::string& s, const std::string& prefix); + static bool endsWith(const std::string& s, const std::string& suffix); + static int stringToDuration(const std::string& s); }; class XcapHelper diff --git a/src/engine/helper/HL_Sync.h b/src/engine/helper/HL_Sync.h index 50071ef7..3fb5d06c 100644 --- a/src/engine/helper/HL_Sync.h +++ b/src/engine/helper/HL_Sync.h @@ -11,6 +11,7 @@ #include #include #include +#include typedef std::recursive_mutex Mutex; typedef std::unique_lock Lock; @@ -22,40 +23,35 @@ public: static long increment(long* value); }; -class Semaphore -{ -private: - unsigned int m_uiCount; - std::mutex m_mutex; - std::condition_variable m_condition; - +class Semaphore { public: - inline Semaphore(unsigned int uiCount) - : m_uiCount(uiCount) { } + Semaphore(unsigned int count = 0) : m_count(count) {} - inline void Wait() - { - std::unique_lock< std::mutex > lock(m_mutex); - m_condition.wait(lock,[&]()->bool{ return m_uiCount>0; }); - --m_uiCount; + void notify() { + std::unique_lock lock(m_mtx); + m_count++; + m_cv.notify_one(); } - template< typename R,typename P > - bool Wait(const std::chrono::duration& crRelTime) - { - std::unique_lock< std::mutex > lock(m_mutex); - if (!m_condition.wait_for(lock,crRelTime,[&]()->bool{ return m_uiCount>0; })) + void wait() { + std::unique_lock lock(m_mtx); + m_cv.wait(lock, [this]() { return m_count > 0; }); + m_count--; + } + + template + bool waitUntil(const std::chrono::time_point& point) { + std::unique_lock lock(m_mtx); + if (!m_cv.wait_until(lock, point, [this]() { return m_count > 0; })) return false; - --m_uiCount; + m_count--; return true; } - inline void Signal() - { - std::unique_lock< std::mutex > lock(m_mutex); - ++m_uiCount; - m_condition.notify_one(); - } +private: + std::mutex m_mtx; + std::condition_variable m_cv; + unsigned int m_count; }; class ThreadHelper @@ -105,4 +101,186 @@ protected: std::vector mBlockList; }; + +// Timer Queue +// +// Allows execution of handlers at a specified time in the future +// Guarantees: +// - All handlers are executed ONCE, even if canceled (aborted parameter will +//be set to true) +// - If TimerQueue is destroyed, it will cancel all handlers. +// - Handlers are ALWAYS executed in the Timer Queue worker thread. +// - Handlers execution order is NOT guaranteed +// +class TimerQueue { +public: + TimerQueue() { + m_th = std::thread([this] { run(); }); + } + + ~TimerQueue() { + cancelAll(); + // Abusing the timer queue to trigger the shutdown. + add(0, [this](bool) { m_finish = true; }); + m_th.join(); + } + + //! Adds a new timer + // \return + // Returns the ID of the new timer. You can use this ID to cancel the + // timer + uint64_t add(int64_t milliseconds, std::function handler) { + WorkItem item; + item.end = Clock::now() + std::chrono::milliseconds(milliseconds); + item.handler = std::move(handler); + + std::unique_lock lk(m_mtx); + uint64_t id = ++m_idcounter; + item.id = id; + m_items.push(std::move(item)); + lk.unlock(); + + // Something changed, so wake up timer thread + m_checkWork.notify(); + return id; + } + + //! Cancels the specified timer + // \return + // 1 if the timer was cancelled. + // 0 if you were too late to cancel (or the timer ID was never valid to + // start with) + size_t cancel(uint64_t id) { + // Instead of removing the item from the container (thus breaking the + // heap integrity), we set the item as having no handler, and put + // that handler on a new item at the top for immediate execution + // The timer thread will then ignore the original item, since it has no + // handler. + std::unique_lock lk(m_mtx); + for (auto&& item : m_items.getContainer()) { + if (item.id == id && item.handler) { + WorkItem newItem; + // Zero time, so it stays at the top for immediate execution + newItem.end = Clock::time_point(); + newItem.id = 0; // Means it is a canceled item + // Move the handler from item to newitem. + // Also, we need to manually set the handler to nullptr, since + // the standard does not guarantee moving an std::function will + // empty it. Some STL implementation will empty it, others will + // not. + newItem.handler = std::move(item.handler); + item.handler = nullptr; + m_items.push(std::move(newItem)); + + lk.unlock(); + // Something changed, so wake up timer thread + m_checkWork.notify(); + return 1; + } + } + return 0; + } + + //! Cancels all timers + // \return + // The number of timers cancelled + size_t cancelAll() { + // Setting all "end" to 0 (for immediate execution) is ok, + // since it maintains the heap integrity + std::unique_lock lk(m_mtx); + for (auto&& item : m_items.getContainer()) { + if (item.id) { + item.end = Clock::time_point(); + item.id = 0; + } + } + auto ret = m_items.size(); + + lk.unlock(); + m_checkWork.notify(); + return ret; + } + +private: + using Clock = std::chrono::steady_clock; + TimerQueue(const TimerQueue&) = delete; + TimerQueue& operator=(const TimerQueue&) = delete; + + void run() { + while (!m_finish) { + auto end = calcWaitTime(); + if (end.first) { + // Timers found, so wait until it expires (or something else + // changes) + m_checkWork.waitUntil(end.second); + } else { + // No timers exist, so wait forever until something changes + m_checkWork.wait(); + } + + // Check and execute as much work as possible, such as, all expired + // timers + checkWork(); + } + + // If we are shutting down, we should not have any items left, + // since the shutdown cancels all items + assert(m_items.size() == 0); + } + + std::pair calcWaitTime() { + std::lock_guard lk(m_mtx); + while (m_items.size()) { + if (m_items.top().handler) { + // Item present, so return the new wait time + return std::make_pair(true, m_items.top().end); + } else { + // Discard empty handlers (they were cancelled) + m_items.pop(); + } + } + + // No items found, so return no wait time (causes the thread to wait + // indefinitely) + return std::make_pair(false, Clock::time_point()); + } + + void checkWork() { + std::unique_lock lk(m_mtx); + while (m_items.size() && m_items.top().end <= Clock::now()) { + WorkItem item(std::move(m_items.top())); + m_items.pop(); + + lk.unlock(); + if (item.handler) + item.handler(item.id == 0); + lk.lock(); + } + } + + Semaphore m_checkWork; + std::thread m_th; + bool m_finish = false; + uint64_t m_idcounter = 0; + + struct WorkItem { + Clock::time_point end; + uint64_t id; // id==0 means it was cancelled + std::function handler; + bool operator>(const WorkItem& other) const { + return end > other.end; + } + }; + + std::mutex m_mtx; + // Inheriting from priority_queue, so we can access the internal container + class Queue : public std::priority_queue, + std::greater> { + public: + std::vector& getContainer() { + return this->c; + } + } m_items; +}; + #endif