00001 #include "MTDistributedIncomingSpikeScheduler.h"
00002
00003 MTDistributedIncomingSpikeScheduler::MTDistributedIncomingSpikeScheduler(int numThreads,
00004 ThreadPool &threadPool,
00005 MPIInputBufferVector &inputBuffers,
00006 GlobalMultiTargetDelayMap &globalDelayMap,
00007 vector<PropagatedSpikeBuffer*> &spikeBuffers,
00008 int cycleSteps) :
00009 nThreads(numThreads), thr_pool(threadPool)
00010 {
00011 incomingSpikeSchedulers.reserve(nThreads);
00012 jobs.reserve(nThreads-1);
00013 for (int i = 0 ; i < nThreads; ++i) {
00014 STDistributedIncomingSpikeScheduler *stIncomingSpikeScheduler =
00015 new STDistributedIncomingSpikeScheduler(inputBuffers,
00016 globalDelayMap[i],
00017 *spikeBuffers[i],
00018 cycleSteps);
00019 incomingSpikeSchedulers.push_back(stIncomingSpikeScheduler);
00020 if (i < nThreads - 1) {
00021 IncomingSpikeSchedulerThreadPoolJob *job = new IncomingSpikeSchedulerThreadPoolJob(
00022 stIncomingSpikeScheduler);
00023 jobs.push_back(job);
00024 }
00025 }
00026 }
00027
00028 MTDistributedIncomingSpikeScheduler::~MTDistributedIncomingSpikeScheduler()
00029 {
00030 for (int i = 0 ; i < nThreads; ++i) {
00031 delete incomingSpikeSchedulers[i];
00032 if (i < nThreads-1)
00033 delete jobs[i];
00034 }
00035 }
00036
00037 void MTDistributedIncomingSpikeScheduler::processMPIInputSpikeBuffers()
00038 {
00039 for (int i = 0 ; i < nThreads - 1 ; ++i) {
00040 thr_pool.dispatch(i, *jobs[i]);
00041 }
00042 incomingSpikeSchedulers[nThreads-1]->processMPIInputSpikeBuffers();
00043 thr_pool.waitAll();
00044 }
00045