00001 #include "MultiThreadSimEngine.h"
00002 #include "SimNetwork.h"
00003
00004 #include <iostream>
00005 using std::cerr;
00006 using std::endl;
00007
00008 SimEngineAdvanceJob::SimEngineAdvanceJob(SingleThreadSimEngine *eng, int numSteps) :
00009 engine(eng), nSteps(numSteps)
00010 {}
00011
00012 void SimEngineAdvanceJob::setNumSteps(int nSteps)
00013 {
00014 this->nSteps = nSteps;
00015 }
00016
00017 void SimEngineAdvanceJob::start()
00018 {
00019 engine->advance(nSteps);
00020 }
00021
00022 SimEngineResetJob::SimEngineResetJob(SingleThreadSimEngine *eng)
00023 {
00024 this->eng = eng;
00025 }
00026
00027 void SimEngineResetJob::start()
00028 {
00029 eng->reset();
00030 }
00031
00032 SimEngineInitializeJob::SimEngineInitializeJob(SingleThreadSimEngine *eng)
00033 {
00034 this->eng = eng;
00035 }
00036
00037 void SimEngineInitializeJob::start()
00038 {
00039 eng->initialize();
00040 }
00041
00042
00043
00044 void CycleAdvanceJob::start()
00045 {
00046 vector< AnalogDelayObject *>::const_iterator it;
00047 for (it = analog_delay_objects->begin(); it != analog_delay_objects->end(); ++it)
00048 (*it)->advanceCycle();
00049 }
00050
00051 MultiThreadSimEngine::MultiThreadSimEngine(int ID,
00052 int numThreads,
00053 ThreadPool &thrPool,
00054 SpikeScheduler &scheduler,
00055 AnalogMessageDispatcherVector &analogDispatchers,
00056 SimNetwork &net)
00057 : SimEngine(ID, scheduler, net), _numThreads(numThreads),
00058 thr_pool(thrPool)
00059 {
00060 advance_jobs.reserve(_numThreads - 1);
00061 reset_jobs.reserve(_numThreads - 1);
00062 initialize_jobs.reserve(_numThreads - 1);
00063 mtCycleAdvanceJobs.reserve(_numThreads - 1);
00064 engines.reserve(_numThreads);
00065 analogDelayObjects.resize(_numThreads);
00066 thr_barrier = new boost::barrier(_numThreads);
00067 for (int i = 0; i < _numThreads; ++i) {
00068 analogDispatchers[i].setBarrier(thr_barrier);
00069 engines.push_back(new SingleThreadSimEngine(i, spikeScheduler, analogDispatchers[i], net));
00070 if (i < _numThreads - 1) {
00071 SimEngineAdvanceJob the_advance_job(engines[i], net.simParameter().minDelay.in_steps( net.get_dt() ));
00072 advance_jobs.push_back(the_advance_job);
00073 SimEngineResetJob the_reset_job(engines[i]);
00074 reset_jobs.push_back(the_reset_job);
00075 SimEngineInitializeJob the_initialize_job(engines[i]);
00076 initialize_jobs.push_back(the_initialize_job);
00077 }
00078 CycleAdvanceJob the_cycle_advance_job(&analogDelayObjects[i]);
00079 mtCycleAdvanceJobs.push_back(the_cycle_advance_job);
00080 }
00081 }
00082
00083 MultiThreadSimEngine::~MultiThreadSimEngine()
00084 {
00085 for (int i = 0 ; i < _numThreads ; ++i) {
00086 delete engines[i];
00087 }
00088 delete thr_barrier;
00089 }
00090
00091 void MultiThreadSimEngine::seed( uint32 noiseSeed )
00092 {
00093 for (int i = 0 ; i < _numThreads ; ++i) {
00094 engines[i]->seed( noiseSeed );
00095 }
00096 }
00097
00098 void MultiThreadSimEngine::seed( vector<uint32> const& sim_seeds )
00099 {
00100 for (int i = 0 ; i < _numThreads ; ++i) {
00101 engines[i]->seed( sim_seeds[i] );
00102 }
00103 }
00104
00105 void MultiThreadSimEngine::noiseRandEngineOutput( vector<uint32> & r )
00106 {
00107 r.resize( _numThreads );
00108 for (int i = 0 ; i < _numThreads ; ++i) {
00109 unsigned v = (unsigned)((*(engines[i]->noiseRandomEngine()))() * 200000);
00110
00111 r[i] = v;
00112 }
00113 }
00114
00115 void MultiThreadSimEngine::mount( const SimObjectFactory &objFactory, const SimObject::ID &mountpoint, SimObject::ID &id )
00116 {
00117 addObject( objFactory, id );
00118 one_way_link( id, mountpoint );
00119 }
00120
00121 void MultiThreadSimEngine::insert( const SimObjectFactory &objFactory, const SimObject::ID &mountpoint, SimObject::ID &id )
00122 {
00123 addObject( objFactory, id );
00124 two_way_link( id, mountpoint );
00125 }
00126
00127 void MultiThreadSimEngine::two_way_link( const SimObject::ID &id1, const SimObject::ID &id2 )
00128 {
00129 SimObject *obj1 = engines[id1.eng]->getObject( id1 );
00130 SimObject *obj2 = engines[id2.eng]->getObject( id2 );
00131
00132 obj1->incoming( obj2, id2, id1, network );
00133 obj2->outgoing( obj1, id1, id2, network );
00134
00135 obj2->incoming( obj1, id1, id2, network );
00136 obj1->outgoing( obj2, id2, id1, network );
00137 }
00138
00139 void MultiThreadSimEngine::one_way_link( const SimObject::ID &src, const SimObject::ID &dst )
00140 {
00141 SimObject *s = engines[src.eng]->getObject( src );
00142 SimObject *d = engines[dst.eng]->getObject( dst );
00143
00144 d->incoming( s, src, dst, network );
00145 s->outgoing( d, dst, src, network );
00146 }
00147
00148 void MultiThreadSimEngine::reset()
00149 {
00150 for (int thr = 0 ; thr < _numThreads-1 ; thr++)
00151 thr_pool.dispatch(thr, reset_jobs[thr]);
00152 engines[_numThreads-1]->reset();
00153 thr_pool.waitAll();
00154 currentStepWithinCycle = 0;
00155 }
00156
00157 void MultiThreadSimEngine::initialize()
00158 {
00159 for (int thr = 0 ; thr < _numThreads-1 ; thr++) {
00160 thr_pool.dispatch(thr, initialize_jobs[thr]);
00161 }
00162 engines[_numThreads-1]->initialize();
00163 thr_pool.waitAll();
00164 }
00165
00166
00167 void MultiThreadSimEngine::advance(int nSteps)
00168 {
00169 int nCycles;
00170 int reminderSteps;
00171 int interval = default_steps_per_cycle;
00172 int leftStepsToFinishPreviousCycle;
00173
00174 if (currentStepWithinCycle) {
00175 if (nSteps + currentStepWithinCycle >= interval) {
00176 leftStepsToFinishPreviousCycle = default_steps_per_cycle - currentStepWithinCycle;
00177 advanceSeveralStepsWithinACycle(leftStepsToFinishPreviousCycle);
00178 finalizeCycle();
00179 } else {
00180 leftStepsToFinishPreviousCycle = nSteps;
00181 advanceSeveralStepsWithinACycle(leftStepsToFinishPreviousCycle);
00182 }
00183 nSteps -=leftStepsToFinishPreviousCycle;
00184 }
00185
00186
00187 reminderSteps = nSteps % interval;
00188 nCycles = (nSteps - reminderSteps) / interval ;
00189 for (int c = 0 ; c < nCycles ; ++c ) {
00190 advanceOneFullCycle();
00191 }
00192
00193 if (reminderSteps > 0 )
00194 advanceSeveralStepsWithinACycle(reminderSteps);
00195
00196 currentStepWithinCycle = reminderSteps;
00197 }
00198
00199
00200 void MultiThreadSimEngine::advanceSeveralStepsWithinACycle(int steps)
00201 {
00202 for (int i = 0 ; i < _numThreads-1 ; i++ )
00203 advance_jobs[i].setNumSteps(steps);
00204
00205 for (int thr = 0 ; thr < _numThreads-1; ++thr)
00206 thr_pool.dispatch(thr, advance_jobs[thr]);
00207
00208 engines[_numThreads-1]->advance(steps);
00209 thr_pool.waitAll();
00210 }
00211
00212
00213 void MultiThreadSimEngine::finalizeCycle()
00214 {
00215 spikeScheduler.nextCycle();
00216
00217 for (int thr = 0; thr < _numThreads-1; ++thr) {
00218 if (analogDelayObjects[thr].size()) {
00219 thr_pool.dispatch(thr, mtCycleAdvanceJobs[thr]);
00220 }
00221 }
00222 if (analogDelayObjects[_numThreads-1].size()) {
00223 mtCycleAdvanceJobs[_numThreads-1].start();
00224 }
00225 thr_pool.waitAll();
00226 }
00227
00228 void MultiThreadSimEngine::advanceOneFullCycle()
00229 {
00230 advanceSeveralStepsWithinACycle(default_steps_per_cycle);
00231 finalizeCycle();
00232 }
00233
00234 SingleThreadSimEngine & MultiThreadSimEngine::getSTEngine(engineid_t eng)
00235 {
00236 return *engines[eng];
00237 }
00238
00239