ThreadObject.hpp

#pragma once
#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>
#include <atomic>
#include <future>



namespace Utils
{
    class ThreadObject
    {
        using Func = std::function<void()>;
        const static int64_t kInfinite = -1;
        const static int64_t kNoWait = 0;

    public:
        ThreadObject()
        {
            m_bStop.store(true);
            m_bWakeup.store(false);
        }

        ~ThreadObject()
        {
            Stop();
        }

        void Start(Func func, int64_t nWaitMSeond = kInfinite)
        {
            do_work(func, nWaitMSeond);
        }

        template<class F, class... Args>
        void Start(int64_t nWaitMSeond, F&& f, Args&&... args)
        {
            auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
            do_work([=]() {task(); }, nWaitMSeond);
        }

        void StartOnce(Func func, int64_t nWaitMSeond = kNoWait)
        {
            Stop();
            m_bStop.store(false);
            m_thWork = std::thread([=] {
                std::unique_lock<std::mutex> lock(m_mtLock);
                if (kInfinite == nWaitMSeond)
                {
                    m_cvCond.wait(lock);
                }
                else if (nWaitMSeond > 0)
                {
                    m_cvCond.wait_for(lock, std::chrono::milliseconds(nWaitMSeond), [&]() { return this->m_bStop.load() || this->m_bWakeup.load(); });
                }
                if (m_bStop.load()) return;
                if (func)
                {
                    func();
                }
                m_bStop.store(true);
                });
        }

        void StartAndWait(Func func, int64_t nWaitMSeond = kInfinite)
        {
            do_work(func, nWaitMSeond, true);
        }

        void Stop()
        {
            m_bStop.store(true);
            m_cvCond.notify_one();
            if (m_thWork.joinable())
                m_thWork.join();
        }

        void NotifyExit()
        {
            m_bStop.store(true);
            m_cvCond.notify_one();
        }

        void SetEvent()
        {
            m_bWakeup = true;
            m_cvCond.notify_one();
        }

        bool IsRunning()
        {
            return (!m_bStop.load());
        }

        void UpdateScheduleTimer(int64_t nWaitMecond)
        {
            m_nScheduleTimer.store(nWaitMecond);
        }


    private:
        ThreadObject(const ThreadObject&) = delete;
        ThreadObject& operator=(const ThreadObject&) = delete;

        ThreadObject(const ThreadObject&&) = delete;
        ThreadObject& operator=(const ThreadObject&&) = delete;


    private:
        void do_work(Func func, int64_t nWaitMSeond, bool bWait = false)
        {
            m_nScheduleTimer = nWaitMSeond;
            Stop();
            m_bStop.store(false);
            m_bWork = !bWait;
            m_thWork = std::thread([=] {
                while (!this->m_bStop.load())
                {
                    if (this->m_bWork.load() && func)
                    {
                        func();
                    }
                    if (!this->m_bWork.load())
                    {
                        this->m_bWork = true;
                    }
                    if (!this->m_bStop.load())
                    {
                        std::unique_lock<std::mutex> lock(m_mtLock);
                        if (kInfinite == m_nScheduleTimer)
                        {
                            m_cvCond.wait(lock);
                        }
                        else if (m_nScheduleTimer > 0)
                        {
                            m_cvCond.wait_for(lock, std::chrono::milliseconds(this->m_nScheduleTimer), [&]() { return this->m_bStop.load() || this->m_bWakeup.load(); });
                            this->m_bWakeup.store(false);
                        }
                    }
                }
                }
            );
        }


    private:
        std::atomic<bool> m_bStop;
        std::atomic<bool> m_bWork;
        std::condition_variable m_cvCond;
        std::mutex m_mtLock;
        std::thread m_thWork;
        std::atomic<int64_t> m_nScheduleTimer;
        std::atomic<bool> m_bWakeup;

    };
}


MessageQueue.hpp

template<class TData>
class MessageQueue
{
    using CallbackSignal = std::function<void(const TData& data)>;
    using CallbackTimeout = std::function<void()>;

    const static int kInfinite = -1;
public:
    MessageQueue()
        : m_bStop(false)
    {

    }

    ~MessageQueue()
    {
        Stop();
    }

    void Start(CallbackSignal cbNotify)
    {
        assert(cbNotify);
        Stop();
        m_bStop.store(false);
        m_thRun = std::thread([=] {
            while (!this->m_bStop.load())
            {
                std::mutex mt;
                std::unique_lock<std::mutex> lock(mt);
                m_cvSignal.wait(lock, [=] {
                    return !this->m_listData.empty() || this->m_bStop.load(); }
                );
                if (this->m_bStop.load())break;
                TData tmpData;
                if (cbNotify && PopFront(tmpData))
                {
                    cbNotify(tmpData);
                }
            }
            }
        );
    }

    void Start(CallbackSignal cbSignal, CallbackTimeout cbTimeout, int nWaitMSecond)
    {
        assert(cbSignal);
        Stop();
        m_bStop.store(false);
        m_thRun = std::thread([=] {
            while (!this->m_bStop.load())
            {
                std::mutex mt;
                std::unique_lock<std::mutex> lock(mt);
                bool bSucc = m_cvSignal.wait_for(lock, std::chrono::milliseconds(nWaitMSecond), [=] {
                    return !this->m_listData.empty() || this->m_bStop.load(); }
                );
                if (this->m_bStop.load()) break;
                if (bSucc)
                {
                    TData tmpData;
                    if (cbSignal && PopFront(tmpData))
                    {
                        cbSignal(tmpData);
                    }
                }
                else
                {
                    if (cbTimeout)
                    {
                        cbTimeout();
                    }
                }
            }
            }
        );
    }

    void Stop()
    {
        m_bStop.store(true);
        m_cvSignal.notify_one();
        if (m_thRun.joinable())
        {
            m_thRun.join();
        }
    }

    void PushFront(const TData& data)
    {
        std::lock_guard<std::mutex> lock(m_mtLock);
        m_listData.emplace_front(data);

        m_cvSignal.notify_one();
    }

    bool PopFront(TData& data)
    {
        std::lock_guard<std::mutex> lock(m_mtLock);
        if (m_listData.empty()) return false;
        data = std::move(m_listData.front());
        m_listData.pop_front();

        return true;
    }

    void PushBack(const TData& data)
    {
        std::lock_guard<std::mutex> lock(m_mtLock);
        m_listData.emplace_back(data);

        m_cvSignal.notify_one();
    }

    bool PopBack(TData& data)
    {
        std::lock_guard<std::mutex> lock(m_mtLock);
        if (m_listData.empty()) return false;
        data = std::move(m_listData.back());
        m_listData.pop_back();

        return true;
    }

    void Clear()
    {
        std::lock_guard<std::mutex> lock(m_mtLock);
        m_listData.clear();
    }

private:
    MessageQueue(const MessageQueue&) = delete;
    MessageQueue& operator=(const MessageQueue&) = delete;

    MessageQueue(const MessageQueue&&) = delete;
    MessageQueue& operator=(const MessageQueue&&) = delete;


private:
    std::atomic<bool> m_bStop;
    std::mutex m_mtLock;
    std::list<TData> m_listData;
    std::condition_variable m_cvSignal;
    std::thread m_thRun;

};

main.cpp

#include <iostream>
#include <vector>
#include <map>
#include <string>
#include "ThreadObject.hpp"
#include "MessageQueue.hpp"
#include <windows.h>


struct CommandInfo
{
    BYTE* lpBuffer;
    DWORD dwCmdID;
    CommandInfo* pPreCmd;
    CommandInfo* pNextCmd;
};

struct TestData
{
    int nNum = 0;
    int nDate = 1;
    void Print() const
    {
        //std::cout << "Data: " << nNum << "-" << nDate << "\n";
    }
};

struct MyData
{
    int nIndex = 0;
    std::string strData;
    std::vector<int> vecData;

 public:
    void Print() const
    {
        std::cout << "Index: " << nIndex << "[";
        for(const auto& item : vecData)
            std::cout << item << ",";
        std::cout << "]\n\n";
    }
};

class MyClass
{
public:
    MyClass()
        : m_nIndex(0)
    {
        //m_queMsg.Start(std::bind(&MyClass::do_print_data, this, std::placeholders::_1));
        m_queMsg.Start(std::bind(&MyClass::PopData, this, std::placeholders::_1));
        /*m_objThread1.Start(std::bind(&MyClass::run, this), 40);
        m_objThread.StartAndWait(std::bind(&MyClass::PushData, this), 50);*/
        
        m_objThread1.Start(std::bind(&MyClass::run, this), 40);
        m_objThread.StartAndWait(std::bind(&MyClass::PushData, this), 60);
    }

    ~MyClass()
    {
        Stop();
    }

    void Stop()
    {
        m_objThread1.Stop();
        m_objThread.Stop();
        m_queMsg.Stop();
    }

private:
    void run()
    {
        std::cout << "This is-- " << 999 << "--test\n";
        TestData* pData = new TestData;
        pData->nDate = 999;
        pData->nNum = 999;

        CommandInfo* cmd = new CommandInfo;
        cmd->lpBuffer = (BYTE*)pData;
        cmd->dwCmdID = 999;

        m_queMsg.PushBack(cmd);
        m_objThread.SetEvent();
        //std::cout << "This is-- " << ++m_nIndex << "--test\n";
        //
        ////m_queMsg.PushBack(m_nIndex);
        //MyData dataTmp;
        //dataTmp.nIndex = m_nIndex;
        //dataTmp.strData = std::to_string(m_nIndex);
        //for (int i = 0;i < m_nIndex;++i)
        //{
        //    dataTmp.vecData.push_back(i);
        //}
        //std::cout << sizeof(dataTmp) << std::endl;
        //m_queMsg.PushBack(dataTmp);
    }

    void do_print_data(/*const int& nData*/const MyData& data)
    {
        //std::cout << "[Callback] Fetch data " << nData << "\n";
        data.Print();
    }

    void PushData()
    {
        std::cout << "This is-- " << ++m_nIndex << "--test\n";
        TestData* pData = new TestData;
        pData->nDate = m_nIndex;
        pData->nNum = m_nIndex;

        CommandInfo* cmd = new CommandInfo;
        cmd->lpBuffer = (BYTE*)pData;
        cmd->dwCmdID = m_nIndex;

        m_queMsg.PushBack(cmd);
    }

    void PopData(CommandInfo* pCmd)
    {
        TestData* pData = (TestData*)pCmd->lpBuffer;
        pData->Print();

        delete pCmd->lpBuffer;
        pCmd->lpBuffer = nullptr;

        delete pCmd;
        pCmd = nullptr;
    }

private:
    MyClass(const MyClass&) = delete;
    MyClass& operator=(const MyClass&) = delete;

    MyClass(const MyClass&&) = delete;
    MyClass& operator=(const MyClass&&) = delete;


private:
    int m_nIndex;
    ThreadObject m_objThread;
    ThreadObject m_objThread1;
    //MessageQueue<int> m_queMsg;
    //MessageQueue<MyData> m_queMsg;
    MessageQueue<CommandInfo*> m_queMsg;
};


int main()
{
    std::cout << sizeof(MyData) << std::endl;
    MyClass objTest;
    std::this_thread::sleep_for(std::chrono::milliseconds(500));

    objTest.Stop();
    getchar();
    std::cout << "Application exit normal\n";

    return 0;
}
Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐