00001 #include "ThreadPool.h"
00002 #include <iostream>
00003
00004 using std::cout ;
00005 using std::endl ;
00006 using std::cerr;
00007
00008 WorkerMainLoop::WorkerMainLoop(WorkerState *st,
00009 boost::condition &c1, boost::mutex &m1,
00010 boost::condition &c2, boost::mutex &m2)
00011 : cond1(c1), mtx1(m1), cond2(c2), mtx2(m2)
00012 {
00013 state = st;
00014 state->toBeFinished = false;
00015 }
00016
00017 void WorkerMainLoop::operator() ()
00018 {
00019 boost::mutex::scoped_lock l1(mtx1);
00020 boost::mutex::scoped_lock l2(mtx2);
00021 l1.unlock();
00022 cond1.notify_one();
00023 while (!state->toBeFinished) {
00024 cond2.wait(l2);
00025 l1.lock();
00026 l1.unlock();
00027 cond1.notify_one();
00028 if (state->job && !state->toBeFinished) {
00029 (*state->job).start();
00030 }
00031 }
00032 }
00033
00034 WorkerMainLoop::~WorkerMainLoop()
00035 {}
00036
00037 WorkerState::WorkerState()
00038 {
00039 job = 0;
00040 }
00041
00042 void WorkerState::finish()
00043 {
00044 toBeFinished = true ;
00045 }
00046
00047 void WorkerState::assignJob(ThreadPoolJob &j)
00048 {
00049 job = &j;
00050 }
00051
00052 boost::function0<void> _f_;
00053
00054 ThreadPool::ThreadPool(int maxNumThreads) : _maxNumThreads(maxNumThreads)
00055 {
00056 threads.resize(_maxNumThreads);
00057 workerstates.resize(_maxNumThreads);
00058 cond1.resize(_maxNumThreads);
00059 cond2.resize(_maxNumThreads);
00060 mtx1.resize(_maxNumThreads);
00061 mtx2.resize(_maxNumThreads);
00062
00063 for (int i = 0 ; i < _maxNumThreads ; i++) {
00064 cond1[i] = new condition ;
00065 cond2[i] = new condition ;
00066 mtx1[i] = new mutex;
00067 mtx2[i] = new mutex;
00068 workerstates[i] = new WorkerState();
00069 WorkerMainLoop worker(workerstates[i], *cond1[i], *mtx1[i], *cond2[i], *mtx2[i]);
00070 boost::function0<void> f;
00071 f = worker ;
00072 mutex::scoped_lock l(*mtx1[i]);
00073 try {
00074 threads[i] = new thread( f );
00075 } catch (boost::thread_resource_error &) {
00076 cout << "ERROR: Cannot start another thread" << endl;
00077 }
00078 cond1[i]->wait(l);
00079 mutex::scoped_lock l2(*mtx2[i]);
00080 }
00081 }
00082
00083 int ThreadPool::dispatch(int id, ThreadPoolJob &f)
00084 {
00085 workerstates[id]->assignJob(f);
00086 boost::mutex::scoped_lock l1(*mtx1[id]);
00087 cond2[id]->notify_one();
00088 cond1[id]->wait(l1);
00089 return 0;
00090 }
00091
00092 void ThreadPool::waitAll()
00093 {
00094 for (int i = 0 ; i < _maxNumThreads ; i++ )
00095 boost::mutex::scoped_lock l2(*mtx2[i]);
00096 }
00097
00098
00099 ThreadPool::~ThreadPool()
00100 {
00101 for (int i = 0 ; i < _maxNumThreads ; i++ ) {
00102 workerstates[i]->finish();
00103 cond2[i]->notify_one();
00104 threads[i]->join();
00105 delete workerstates[i];
00106 delete threads[i];
00107 delete cond1[i];
00108 delete cond2[i];
00109 delete mtx1[i];
00110 delete mtx2[i];
00111 }
00112 }