/* Copyright(C) 2007-2017 VoIPobjects (voipobjects.com) * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "HL_Sync.h" #include #include #include #include #ifdef TARGET_OSX # include #endif #ifdef TARGET_WIN # include #endif void SyncHelper::delay(unsigned int microseconds) { #ifdef TARGET_WIN ::Sleep(microseconds/1000); #endif #if defined(TARGET_OSX) || defined(TARGET_LINUX) timespec requested, remaining; requested.tv_sec = microseconds / 1000000; requested.tv_nsec = (microseconds % 1000000) * 1000; remaining.tv_nsec = 0; remaining.tv_sec = 0; nanosleep(&requested, &remaining); #endif } long SyncHelper::increment(long *value) { assert(value); #ifdef TARGET_WIN return ::InterlockedIncrement((LONG*)value); #elif TARGET_OSX return OSAtomicIncrement32((int32_t*)value); #elif TARGET_LINUX return -1; #else return -1; #endif } // ------------------- ThreadHelper ------------------- void ThreadHelper::setName(const std::string &name) { #if defined(TARGET_LINUX) int retcode = pthread_setname_np(pthread_self(), name.c_str()); if (retcode != 0) { std::cerr << "Failed to set Linux thread name" << std::endl; } #endif } uint64_t ThreadHelper::getCurrentId() { #if defined(TARGET_WIN) return static_cast(GetCurrentThreadId()); #endif #if defined(TARGET_LINUX)||defined(TARGET_OSX) // RPi builds want this! return (uint64_t)(pthread_self()); #endif #if defined(TARGET_ANDROID) return (uint64_t)(pthread_self()); #endif return 0; } // ------------------- TimeHelper --------------- using namespace std::chrono; // Milliseconds starting from the epoch static uint64_t TimestampStartPoint = duration_cast(steady_clock::now().time_since_epoch()).count(); // Seconds starting from the epoch static time_t TimestampBase = time(nullptr); // Returns number of milliseconds starting from 01 Jan 1970 GMT uint64_t chronox::getTimestamp() { time_point t = steady_clock::now(); uint64_t ms = duration_cast< milliseconds >(t.time_since_epoch()).count(); return ms - TimestampStartPoint + TimestampBase * 1000; } uint64_t chronox::getUptime() { time_point t = steady_clock::now(); uint64_t ms = duration_cast< milliseconds >(t.time_since_epoch()).count(); return ms - TimestampStartPoint; } uint32_t chronox::getDelta(uint32_t later, uint32_t earlier) { if (later > earlier) return later - earlier; if (later < earlier && later < 0x7FFFFFFF && earlier >= 0x7FFFFFFF) return 0xFFFFFFFF - earlier + later; return 0; } timespec chronox::toTimespec(uint64_t milliseconds) { timespec r; r.tv_sec = milliseconds / 1000; r.tv_nsec = milliseconds % 1000; r.tv_nsec *= 1000 * 1000; return r; } int64_t chronox::getDelta(const timespec& a, const timespec& b) { uint64_t ms_a = a.tv_sec * 1000 + a.tv_nsec / 10000000; uint64_t ms_b = b.tv_sec * 1000 + a.tv_nsec / 10000000; return ms_a - ms_b; } int64_t chronox::getDelta(const timeval& a, const timeval& b) { int64_t diff_seconds = a.tv_sec - b.tv_sec; int64_t diff_microseconds = a.tv_usec - b.tv_usec; return diff_seconds * 1000 + diff_microseconds / 1000; } chronox::ExecutionTime::ExecutionTime() { mStart = chronox::getTimestamp(); } uint64_t chronox::ExecutionTime::getSpentTime() const { return chronox::getTimestamp() - mStart; } // --------------- BufferQueue ----------------- BufferQueue::BufferQueue() { } BufferQueue::~BufferQueue() { } void BufferQueue::push(const void* data, int bytes) { std::unique_lock l(mMutex); PBlock b = std::make_shared(); b->resize(bytes); memcpy(b->data(), data, bytes); mBlockList.push_back(b); mSignal.notify_one(); } BufferQueue::PBlock BufferQueue::pull(int milliseconds) { std::unique_lock l(mMutex); std::cv_status status = mBlockList.empty() ? std::cv_status::timeout : std::cv_status::no_timeout; if (mBlockList.empty()) status = mSignal.wait_for(l, std::chrono::milliseconds(milliseconds)); PBlock r; if (status == std::cv_status::no_timeout && !mBlockList.empty()) { r = mBlockList.front(); mBlockList.pop_front(); } return r; } // ----------------- Semaphore --------------------- Semaphore::Semaphore(unsigned int count) : m_count(count) {} void Semaphore::notify() { std::unique_lock lock(m_mtx); m_count++; m_cv.notify_one(); } void Semaphore::wait() { std::unique_lock lock(m_mtx); m_cv.wait(lock, [this]() { return m_count > 0; }); m_count--; } bool Semaphore::waitFor(int milliseconds) { std::unique_lock lock(m_mtx); if (!m_cv.wait_for(lock, std::chrono::milliseconds(milliseconds), [this]() { return m_count > 0; })) return false; m_count--; return true; } // ------------------- TimerQueue ------------------- TimerQueue::TimerQueue() { m_th = std::thread([this] { run(); }); } TimerQueue::~TimerQueue() { cancelAll(); // Abusing the timer queue to trigger the shutdown. add(std::chrono::milliseconds(0), [this](bool) { m_finish = true; }); m_th.join(); } //! Adds a new timer // \return // Returns the ID of the new timer. You can use this ID to cancel the // timer uint64_t TimerQueue::add(std::chrono::milliseconds milliseconds, std::function handler) { WorkItem item; item.end = Clock::now() + milliseconds; item.handler = std::move(handler); std::unique_lock lk(m_mtx); uint64_t id = ++m_idcounter; item.id = id; m_items.push(std::move(item)); lk.unlock(); // Something changed, so wake up timer thread m_checkWork.notify(); return id; } //! Cancels the specified timer // \return // 1 if the timer was cancelled. // 0 if you were too late to cancel (or the timer ID was never valid to // start with) size_t TimerQueue::cancel(uint64_t id) { // Instead of removing the item from the container (thus breaking the // heap integrity), we set the item as having no handler, and put // that handler on a new item at the top for immediate execution // The timer thread will then ignore the original item, since it has no // handler. std::unique_lock lk(m_mtx); for (auto&& item : m_items.getContainer()) { if (item.id == id && item.handler) { WorkItem newItem; // Zero time, so it stays at the top for immediate execution newItem.end = Clock::time_point(); newItem.id = 0; // Means it is a canceled item // Move the handler from item to newitem. // Also, we need to manually set the handler to nullptr, since // the standard does not guarantee moving an std::function will // empty it. Some STL implementation will empty it, others will // not. newItem.handler = std::move(item.handler); item.handler = nullptr; m_items.push(std::move(newItem)); // std::cout << "Cancelled timer. " << std::endl; lk.unlock(); // Something changed, so wake up timer thread m_checkWork.notify(); return 1; } } return 0; } //! Cancels all timers // \return // The number of timers cancelled size_t TimerQueue::cancelAll() { // Setting all "end" to 0 (for immediate execution) is ok, // since it maintains the heap integrity std::unique_lock lk(m_mtx); for (auto&& item : m_items.getContainer()) { if (item.id) { item.end = Clock::time_point(); item.id = 0; } } auto ret = m_items.size(); lk.unlock(); m_checkWork.notify(); return ret; } void TimerQueue::run() { ThreadHelper::setName("TimerQueue"); while (!m_finish) { auto end = calcWaitTime(); if (end.first) { // Timers found, so wait until it expires (or something else // changes) int milliseconds = std::chrono::duration_cast (end.second - std::chrono::steady_clock::now()).count(); //std::cout << "Waiting m_checkWork for " << milliseconds * 1000 << "ms." << std::endl; m_checkWork.waitFor(milliseconds); } else { // No timers exist, so wait forever until something changes m_checkWork.wait(); } // Check and execute as much work as possible, such as, all expired // timers checkWork(); } // If we are shutting down, we should not have any items left, // since the shutdown cancels all items assert(m_items.size() == 0); } std::pair TimerQueue::calcWaitTime() { std::lock_guard lk(m_mtx); while (m_items.size()) { if (m_items.top().handler) { // Item present, so return the new wait time return std::make_pair(true, m_items.top().end); } else { // Discard empty handlers (they were cancelled) m_items.pop(); } } // No items found, so return no wait time (causes the thread to wait // indefinitely) return std::make_pair(false, Clock::time_point()); } void TimerQueue::checkWork() { std::unique_lock lk(m_mtx); while (m_items.size() && m_items.top().end <= Clock::now()) { WorkItem item(std::move(m_items.top())); m_items.pop(); lk.unlock(); if (item.handler) item.handler(item.id == 0); lk.lock(); } } bool TimerQueue::WorkItem::operator > (const TimerQueue::WorkItem& other) const { return end > other.end; } std::vector& TimerQueue::Queue::getContainer() { return this->c; }