Boost C++ Libraries Home Libraries People FAQ More

PrevUpHomeNext

Running with worker threads

Keep workers running

If a worker thread is used but no fiber is created or parts of the framework (like this_fiber::yield()) are touched, then no fiber scheduler is instantiated.

auto worker = std::thread(
        []{
            // fiber scheduler not instantiated
        });
worker.join();

If use_scheduling_algorithm<>() is invoked, the fiber scheduler is created. If the worker thread simply returns, destroys the scheduler and terminates.

auto worker = std::thread(
        []{
            // fiber scheduler created
            boost::fibers::use_scheduling_algorithm<my_fiber_scheduler>();
        });
worker.join();

In order to keep the worker thread running, the fiber associated with the thread stack (which is called main fiber) is blocked. For instance the main fiber might wait on a condition_variable. For a gracefully shutdown condition_variable is signalled and the main fiber returns. The scheduler gets destructed if all fibers of the worker thread have been terminated.

boost::fibers::mutex mtx;
boost::fibers::condition_variable_any cv;
auto worker = std::thread(
        [&mtx,&cv]{
            mtx.lock();
            // suspend till signalled
            cv.wait(mtx);
            mtx.unlock();
        });
// signal termination
cv.notify_all();
worker.join();

Processing tasks

Tasks can be transferred via channels. The worker thread runs a pool of fibers that dequeue and executed tasks from the channel. The termination is signalled via closing the channel.

using task = std::function<void()>;
boost::fibers::buffered_channel<task> ch{1024};
auto worker = std::thread(
        [&ch]{
            // create pool of fibers
            for (int i=0; i<10; ++i) {
                boost::fibers::fiber{
                    [&ch]{
                        task tsk;
                        // dequeue and process tasks
                        while (boost::fibers::channel_op_status::closed!=ch.pop(tsk)){
                            tsk();
                        }
                    }}.detach();
            }
            task tsk;
            // dequeue and process tasks
            while (boost::fibers::channel_op_status::closed!=ch.pop(tsk)){
                tsk();
            }
        });
// feed channel with tasks
ch.push([]{ ... });
...
// signal termination
ch.close();
worker.join();

An alternative is to use a work-stealing scheduler. This kind of scheduling algorithm a worker thread steals fibers from the ready-queue of other worker threads if its own ready-queue is empty.

[Note] Note

Wait till all worker threads have registered the work-stealing scheduling algorithm.

boost::fibers::mutex mtx;
boost::fibers::condition_variable_any cv;
// start wotrker-thread first
auto worker = std::thread(
        [&mtx,&cv]{
            boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(2);
            mtx.lock();
            // suspend main-fiber from the worker thread
            cv.wait(mtx);
            mtx.unlock();
        });
boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(2);
// create fibers with tasks
boost::fibers::fiber f{[]{ ... }};
...
// signal termination
cv.notify_all();
worker.join();
[Important] Important

Because the TIB (thread information block on Windows) is not fully described in the MSDN, it might be possible that not all required TIB-parts are swapped. Using WinFiber implementation might be an alternative (see documentation about implementations fcontext_t, ucontext_t and WinFiber of boost.context).


PrevUpHomeNext