00001 #include "MTDistributedIncomingSpikeScheduler.h" 00002 00003 MTDistributedIncomingSpikeScheduler::MTDistributedIncomingSpikeScheduler(int numThreads, 00004 ThreadPool &threadPool, 00005 MPIInputBufferVector &inputBuffers, 00006 GlobalMultiTargetDelayMap &globalDelayMap, 00007 vector<PropagatedSpikeBuffer*> &spikeBuffers, 00008 int cycleSteps) : 00009 nThreads(numThreads), thr_pool(threadPool) 00010 { 00011 incomingSpikeSchedulers.reserve(nThreads); 00012 jobs.reserve(nThreads-1); 00013 for (int i = 0 ; i < nThreads; ++i) { 00014 STDistributedIncomingSpikeScheduler *stIncomingSpikeScheduler = 00015 new STDistributedIncomingSpikeScheduler(inputBuffers, 00016 globalDelayMap[i], 00017 *spikeBuffers[i], 00018 cycleSteps); 00019 incomingSpikeSchedulers.push_back(stIncomingSpikeScheduler); 00020 if (i < nThreads - 1) { 00021 IncomingSpikeSchedulerThreadPoolJob *job = new IncomingSpikeSchedulerThreadPoolJob( 00022 stIncomingSpikeScheduler); 00023 jobs.push_back(job); 00024 } 00025 } 00026 } 00027 00028 MTDistributedIncomingSpikeScheduler::~MTDistributedIncomingSpikeScheduler() 00029 { 00030 for (int i = 0 ; i < nThreads; ++i) { 00031 delete incomingSpikeSchedulers[i]; 00032 if (i < nThreads-1) 00033 delete jobs[i]; 00034 } 00035 } 00036 00037 void MTDistributedIncomingSpikeScheduler::processMPIInputSpikeBuffers() 00038 { 00039 for (int i = 0 ; i < nThreads - 1 ; ++i) { 00040 thr_pool.dispatch(i, *jobs[i]); 00041 } 00042 incomingSpikeSchedulers[nThreads-1]->processMPIInputSpikeBuffers(); 00043 thr_pool.waitAll(); 00044 } 00045