MultiThreadSimEngine.cpp

Go to the documentation of this file.
00001 #include "MultiThreadSimEngine.h"
00002 #include "SimNetwork.h"
00003 
00004 #include <iostream>
00005 using std::cerr;
00006 using std::endl;
00007 
00008 SimEngineAdvanceJob::SimEngineAdvanceJob(SingleThreadSimEngine *eng, int numSteps) :
00009         engine(eng), nSteps(numSteps)
00010 {}
00011 
00012 void SimEngineAdvanceJob::setNumSteps(int nSteps)
00013 {
00014     this->nSteps = nSteps;
00015 }
00016 
00017 void SimEngineAdvanceJob::start()
00018 {
00019     engine->advance(nSteps);
00020 }
00021 
00022 SimEngineResetJob::SimEngineResetJob(SingleThreadSimEngine *eng)
00023 {
00024     this->eng = eng;
00025 }
00026 
00027 void SimEngineResetJob::start()
00028 {
00029     eng->reset();
00030 }
00031 
00032 SimEngineInitializeJob::SimEngineInitializeJob(SingleThreadSimEngine *eng)
00033 {
00034     this->eng = eng;
00035 }
00036 
00037 void SimEngineInitializeJob::start()
00038 {
00039     eng->initialize();
00040 }
00041 
00042 
00043 
00044 void CycleAdvanceJob::start()
00045 {
00046     vector< AnalogDelayObject *>::const_iterator it;
00047     for (it = analog_delay_objects->begin(); it != analog_delay_objects->end(); ++it)
00048         (*it)->advanceCycle();
00049 }
00050 
00051 MultiThreadSimEngine::MultiThreadSimEngine(int ID,
00052         int numThreads,
00053         ThreadPool &thrPool,
00054         SpikeScheduler &scheduler,
00055         AnalogMessageDispatcherVector &analogDispatchers,
00056         SimNetwork &net)
00057         : SimEngine(ID, scheduler, net), _numThreads(numThreads),
00058         thr_pool(thrPool)
00059 {
00060     advance_jobs.reserve(_numThreads - 1);
00061     reset_jobs.reserve(_numThreads - 1);
00062     initialize_jobs.reserve(_numThreads - 1);
00063     mtCycleAdvanceJobs.reserve(_numThreads - 1);
00064     engines.reserve(_numThreads);
00065     analogDelayObjects.resize(_numThreads);
00066     thr_barrier = new boost::barrier(_numThreads);
00067     for (int i = 0; i < _numThreads; ++i) {
00068         analogDispatchers[i].setBarrier(thr_barrier);
00069         engines.push_back(new SingleThreadSimEngine(i, spikeScheduler, analogDispatchers[i], net));
00070         if (i < _numThreads - 1) {
00071             SimEngineAdvanceJob the_advance_job(engines[i], net.simParameter().minDelay.in_steps( net.get_dt() ));
00072             advance_jobs.push_back(the_advance_job);
00073             SimEngineResetJob the_reset_job(engines[i]);
00074             reset_jobs.push_back(the_reset_job);
00075             SimEngineInitializeJob the_initialize_job(engines[i]);
00076             initialize_jobs.push_back(the_initialize_job);
00077         }
00078         CycleAdvanceJob the_cycle_advance_job(&analogDelayObjects[i]);
00079         mtCycleAdvanceJobs.push_back(the_cycle_advance_job);
00080     }
00081 }
00082 
00083 MultiThreadSimEngine::~MultiThreadSimEngine()
00084 {
00085     for (int i = 0 ; i < _numThreads ; ++i) {
00086         delete engines[i];
00087     }
00088     delete thr_barrier;
00089 }
00090 
00091 void MultiThreadSimEngine::seed( uint32 noiseSeed )
00092 {
00093     for (int i = 0 ; i < _numThreads ; ++i) {
00094         engines[i]->seed( noiseSeed );
00095     }
00096 }
00097 
00098 void MultiThreadSimEngine::seed( vector<uint32> const& sim_seeds )
00099 {
00100     for (int i = 0 ; i < _numThreads ; ++i) {
00101         engines[i]->seed( sim_seeds[i] );
00102     }
00103 }
00104 
00105 void MultiThreadSimEngine::noiseRandEngineOutput( vector<uint32> & r )
00106 {
00107     r.resize( _numThreads );
00108     for (int i = 0 ; i < _numThreads ; ++i) {
00109         unsigned v = (unsigned)((*(engines[i]->noiseRandomEngine()))() * 200000);
00110         // cerr << "r=" << v << endl;
00111         r[i] = v;
00112     }
00113 }
00114 
00115 void MultiThreadSimEngine::mount( const SimObjectFactory &objFactory, const SimObject::ID &mountpoint, SimObject::ID &id )
00116 {
00117     addObject( objFactory, id );
00118     one_way_link( id, mountpoint );
00119 }
00120 
00121 void MultiThreadSimEngine::insert( const SimObjectFactory &objFactory, const SimObject::ID &mountpoint, SimObject::ID &id )
00122 {
00123     addObject( objFactory, id );
00124     two_way_link( id, mountpoint );
00125 }
00126 
00127 void MultiThreadSimEngine::two_way_link( const SimObject::ID &id1, const SimObject::ID &id2 )
00128 {
00129     SimObject *obj1 = engines[id1.eng]->getObject( id1 );
00130     SimObject *obj2 = engines[id2.eng]->getObject( id2 );
00131 
00132     obj1->incoming( obj2, id2, id1, network );
00133     obj2->outgoing( obj1, id1, id2, network );
00134 
00135     obj2->incoming( obj1, id1, id2, network );
00136     obj1->outgoing( obj2, id2, id1, network );
00137 }
00138 
00139 void MultiThreadSimEngine::one_way_link( const SimObject::ID &src, const SimObject::ID &dst )
00140 {
00141     SimObject *s = engines[src.eng]->getObject( src );
00142     SimObject *d = engines[dst.eng]->getObject( dst );
00143 
00144     d->incoming( s, src, dst, network );
00145     s->outgoing( d, dst, src, network );
00146 }
00147 
00148 void MultiThreadSimEngine::reset()
00149 {
00150     for (int thr = 0 ; thr < _numThreads-1 ; thr++)
00151         thr_pool.dispatch(thr, reset_jobs[thr]);
00152     engines[_numThreads-1]->reset();
00153     thr_pool.waitAll();
00154     currentStepWithinCycle = 0;
00155 }
00156 
00157 void MultiThreadSimEngine::initialize()
00158 {
00159     for (int thr = 0 ; thr < _numThreads-1 ; thr++) {
00160         thr_pool.dispatch(thr, initialize_jobs[thr]);
00161     }
00162     engines[_numThreads-1]->initialize();
00163     thr_pool.waitAll();
00164 }
00165 
00166 
00167 void MultiThreadSimEngine::advance(int nSteps)
00168 {
00169     int nCycles;
00170     int reminderSteps;
00171     int interval = default_steps_per_cycle;
00172     int leftStepsToFinishPreviousCycle;
00173 
00174     if (currentStepWithinCycle) {
00175         if (nSteps + currentStepWithinCycle >= interval) {
00176             leftStepsToFinishPreviousCycle = default_steps_per_cycle - currentStepWithinCycle;
00177             advanceSeveralStepsWithinACycle(leftStepsToFinishPreviousCycle);
00178             finalizeCycle();
00179         } else {
00180             leftStepsToFinishPreviousCycle = nSteps;
00181             advanceSeveralStepsWithinACycle(leftStepsToFinishPreviousCycle);
00182         }
00183         nSteps -=leftStepsToFinishPreviousCycle;
00184     }
00185 
00186 
00187     reminderSteps = nSteps % interval;
00188     nCycles = (nSteps - reminderSteps) / interval ;
00189     for (int c = 0 ; c < nCycles ; ++c ) {
00190         advanceOneFullCycle();
00191     }
00192 
00193     if (reminderSteps > 0 )
00194         advanceSeveralStepsWithinACycle(reminderSteps);
00195 
00196     currentStepWithinCycle = reminderSteps;
00197 }
00198 
00199 
00200 void MultiThreadSimEngine::advanceSeveralStepsWithinACycle(int steps)
00201 {
00202     for (int i = 0 ; i < _numThreads-1 ; i++ )
00203         advance_jobs[i].setNumSteps(steps);
00204 
00205     for (int thr = 0 ; thr < _numThreads-1; ++thr)
00206         thr_pool.dispatch(thr, advance_jobs[thr]);
00207 
00208     engines[_numThreads-1]->advance(steps);
00209     thr_pool.waitAll();
00210 }
00211 
00212 
00213 void MultiThreadSimEngine::finalizeCycle()
00214 {
00215     spikeScheduler.nextCycle();
00216 
00217     for (int thr = 0; thr < _numThreads-1; ++thr) {
00218         if (analogDelayObjects[thr].size()) {
00219             thr_pool.dispatch(thr, mtCycleAdvanceJobs[thr]);
00220         }
00221     }
00222     if (analogDelayObjects[_numThreads-1].size()) {
00223         mtCycleAdvanceJobs[_numThreads-1].start();
00224     }
00225     thr_pool.waitAll();
00226 }
00227 
00228 void MultiThreadSimEngine::advanceOneFullCycle()
00229 {
00230     advanceSeveralStepsWithinACycle(default_steps_per_cycle);
00231     finalizeCycle();
00232 }
00233 
00234 SingleThreadSimEngine & MultiThreadSimEngine::getSTEngine(engineid_t eng)
00235 {
00236     return *engines[eng];
00237 }
00238 
00239 

Generated on Wed Jul 9 16:34:38 2008 for PCSIM by  doxygen 1.5.5