DistributedMultiThreadNetwork.cpp

Go to the documentation of this file.
00001 #include "DistributedMultiThreadNetwork.h"
00002 #include "MPIAllToAllCommunicatorFactory.h"
00003 #include "MultiThreadNetwork.h"
00004 #include "SpikeSender.h"
00005 #include "PCSIMException.h"
00006 
00007 #include <iostream>
00008 
00009 using std::cout;
00010 using std::cerr;
00011 using std::endl;
00012 
00013 #include <string>
00014 using std::string;
00015 
00016 #include <boost/format.hpp>
00017 
00018 
00019 
00021 DistributedMultiThreadNetwork::DistributedMultiThreadNetwork(int numThreads, SimParameter sp )
00022         : DistributedNetwork( numThreads, MPI::COMM_WORLD, sp ), nThreads(numThreads), thrPool(numThreads-1)
00023 {
00024     init();
00025 }
00026 
00027 DistributedMultiThreadNetwork::DistributedMultiThreadNetwork(int numThreads, MPI::Intracomm &mpiCommunicator, SimParameter sp )
00028         : DistributedNetwork( numThreads, mpiCommunicator, sp ), nThreads(numThreads), thrPool(numThreads-1)
00029 {
00030     init();
00031 }
00032 
00033 
00034 DistributedMultiThreadNetwork::~DistributedMultiThreadNetwork()
00035 {
00036     delete distEngine;
00037     delete mpiAllToAllComm;
00038     delete distIncomingSpikeScheduler;
00039     delete localSimEngine;
00040     delete spikeScheduler;
00041     delete mpiInBuffers;
00042     delete mpiOutBuffers;
00043     delete globalDelayMap;
00044     delete targetNodesMap;
00045     delete localSpikeScheduler;
00046     for (int i = 0 ; i < nThreads ; ++i) {
00047         delete STBuffers[i];
00048     }
00049     delete mtTables;
00050 
00051     // delete dispatchers
00052     for (int i = 0; i < mpi_comm.Get_size(); ++i)
00053         delete sources2BufPositionsMaps[i];
00054 
00055     for (int thr = 0; thr < nThreads; ++thr) {
00056         delete mTanalogMsgDispatchers[thr];
00057         delete distAnalogMsgDispatchers[thr];
00058         for (int i = 0; i < mpi_comm.Get_size(); ++i) {
00059             delete incomingDispatchers[thr][i];
00060             delete outgoingDispatchers[thr][i];
00061         }
00062     }
00063 
00064     delete mtDistCycledAnalogMsgDispatcher;
00065 
00066     // delete creators
00067     delete mtAnalogMsgCreator;
00068     delete analogDelayObjectsMap;
00069     for (int thr = 0; thr < nThreads; ++thr) {
00070         delete stAnalogMsgCreators[thr];
00071         delete distAnalogMessageCreators[thr];
00072     }
00073 
00074 }
00075 
00076 void DistributedMultiThreadNetwork::init()
00077 {
00078         if (simParam.minDelay.in_sec() < simParam.dt.in_sec())
00079                 throw(
00080                     PCSIM::ConstructionException("DistributedMultiThreadNetwork::init",
00081                 str(boost::format("minDelay (%1%) smaller than dt (%2%)") 
00082                     % simParam.minDelay.in_sec() % simParam.dt.in_sec() )    
00083             ) 
00084         );
00085 
00086     mtTables = new MTSpikeRoutingTables(nThreads);
00087 
00088     STBuffers.resize(nThreads);
00089     for (int i = 0 ; i < nThreads ; ++i) {
00090         STBuffers[i] = new PropagatedSpikeBuffer(simParam.minDelay.in_steps( get_dt() ), simParam.maxDelay.in_steps( get_dt() ));
00091     }
00092 
00093     localSpikeScheduler = new MultiThreadSpikeScheduler(nThreads, *mtTables, STBuffers, simParam ) ;
00094 
00095     targetNodesMap = new TargetNodesMap();
00096     globalDelayMap = new GlobalMultiTargetDelayMap(nThreads);
00097 
00098     mpiOutBuffers = new MPIOutputBufferVector(mpi_comm.Get_size(), glengineids[mpi_comm.Get_rank()]);
00099 
00100     mpiInBuffers = new MPIInputBufferVector(glengineids, mpi_comm.Get_size());
00101 
00102     spikeScheduler = new DistributedSpikeScheduler(*localSpikeScheduler,
00103                      *targetNodesMap,
00104                      *mpiOutBuffers,
00105                      nThreads);
00106 
00107 
00108     // ********* setup mt local dispatchers
00109     for (int thr = 0 ; thr < nThreads ; ++thr)
00110         mTanalogMsgDispatchers.push_back(new MultiThreadAnalogMsgDispatcher( (delay_t)simParam.minDelay.in_steps( get_dt() ) ) );
00111 
00112     // ********** setup distributed dispatchers
00113     incomingDispatchers.resize(nThreads);
00114     outgoingDispatchers.resize(nThreads);
00115 
00116     for (int i = 0; i < mpi_comm.Get_size(); ++i)
00117         sources2BufPositionsMaps.push_back(new DistIncomingAnalogSources2BufPosMap);
00118 
00119     for (int thr = 0 ; thr < nThreads; ++thr) {
00120         for (int i = 0; i < mpi_comm.Get_size(); ++i) {
00121             incomingDispatchers[thr].push_back(new DistributedIncomingAnalogMsgDispatcher(
00122                                                    (delay_t)simParam.minDelay.in_steps( get_dt() ),
00123                                                    &(*mpiInBuffers)[i],
00124                                                    sources2BufPositionsMaps[i]));
00125 
00126             outgoingDispatchers[thr].push_back(new DistributedOutgoingAnalogMsgDispatcher(
00127                                                    0,
00128                                                    (delay_t)simParam.minDelay.in_steps( get_dt() ),
00129                                                    &(*mpiOutBuffers)[i]));
00130         }
00131 
00132         distAnalogMsgDispatchers.push_back(new DistributedAnalogMessageDispatcher(mpi_comm.Get_size(),
00133                                            (delay_t)simParam.minDelay.in_steps( get_dt() ),
00134                                            incomingDispatchers[thr],
00135                                            outgoingDispatchers[thr],
00136                                            *mTanalogMsgDispatchers[thr]));
00137     }
00138 
00139     mtDistCycledAnalogMsgDispatcher = new MTDistributedCycledAnalogMsgDispatcher(thrPool,
00140                                       distAnalogMsgDispatchers);
00141 
00142     // local engine
00143     AnalogMessageDispatcherVectorImpl<DistributedAnalogMessageDispatcher> dispatcherVector(distAnalogMsgDispatchers);
00144 
00145     localSimEngine = new MultiThreadSimEngine(0, nThreads, thrPool, *spikeScheduler, dispatcherVector, *this );
00146 
00147     // ****** setup creators
00148     // local creators
00149     analogDelayObjectsMap = new AnalogDelayObjectMap;
00150 
00151     for (int i = 0; i < nThreads ; ++i) {
00152         stAnalogMsgCreators.push_back(new STAnalogMessageCreator(
00153                                           mTanalogMsgDispatchers[i]->sTLocalDispatcher(),
00154                                           (dynamic_cast<MultiThreadSimEngine *>(localSimEngine))->getSTEngine(i),
00155                                           *analogDelayObjectsMap,
00156                                           (delay_t)simParam.minDelay.in_steps( simParam.dt) ) );
00157     }
00158 
00159     mtAnalogMsgCreator = new MTAnalogMessageCreator(mTanalogMsgDispatchers,
00160                          *dynamic_cast<MultiThreadSimEngine *>(localSimEngine),
00161                          *analogDelayObjectsMap,
00162                          (delay_t)simParam.minDelay.in_steps( simParam.dt));
00163     // distributed creators
00164     for (int i = 0; i < nThreads ; ++i) {
00165         distAnalogMessageCreators.push_back( new DistributedAnalogMessageCreator(
00166                                                  (dynamic_cast<MultiThreadSimEngine *>(localSimEngine))->getSTEngine(i),
00167                                                  *distAnalogMsgDispatchers[i],
00168                                                  (delay_t)simParam.minDelay.in_steps( get_dt() )) );
00169 
00170     }
00171 
00172 
00173 
00174     distIncomingSpikeScheduler = new MTDistributedIncomingSpikeScheduler(nThreads,
00175                                  thrPool,
00176                                  *mpiInBuffers,
00177                                  *globalDelayMap,
00178                                  STBuffers,
00179                                  (delay_t)simParam.minDelay.in_steps( get_dt() ));
00180 
00181     MPIAllToAllCommunicatorFactory mpi_comm_factory(MPIAllToAllCommunicatorFactory::Default);
00182 
00183     mpiAllToAllComm = mpi_comm_factory.getCommunicator(*mpiInBuffers,
00184                       *mpiOutBuffers,
00185                       mpi_comm,
00186                       incomingConnections,
00187                       outgoingConnections);
00188 
00189 
00190     distEngine = new DistributedSimEngine(*localSimEngine,
00191                                           *distIncomingSpikeScheduler,
00192                                           *spikeScheduler,
00193                                           *mtDistCycledAnalogMsgDispatcher,
00194                                           *mpiAllToAllComm,
00195                                           *this );
00196 
00197     setupConstructRNGEngines();
00198     DistributedNetwork::seed_noise_rng( makeSeed( simParam.simulationRNGSeed ) );
00199     
00200     // Make sure that all net works are through the init befor anything else can happen
00201     mpi_comm.Barrier();
00202 
00203 }
00204 
00205 
00206 SimObject * DistributedMultiThreadNetwork::_getObject_(const SimObject::ID &id)
00207 {
00208     if (id.node == mpi_comm.Get_rank())
00209         return localSimEngine->getObject(id);
00210     return NULL;
00211 }
00212 
00213 void DistributedMultiThreadNetwork::_addSpikeMessage_(const SimObject::ID &sender, const port_t, const spike_port_id_t sender_port, const SimObject::ID &receiver, const port_t port, const step_t delay)
00214 {
00215 
00216     if (sender.node != receiver.node) {
00217         if (mpi_comm.Get_rank() == sender.node) {
00218             outgoingConnections[receiver.node] = true;
00219         } else if (mpi_comm.Get_rank() == receiver.node) {
00220             incomingConnections[sender.node] = true;
00221         }
00222     }
00223 
00224     if (sender.node == mpi_comm.Get_rank() && receiver.node == mpi_comm.Get_rank()) {
00225 
00226         if( (!sender.isInTheSameSTEngineWith(receiver) && delay < simParam.minDelay.in_steps( get_dt() ) ) || delay > simParam.maxDelay.in_steps( get_dt() ) ) {
00227             throw(
00228                 PCSIM::ConstructionException(
00229                     "DistributedMultiThreadNetwork::addSpikeMessage",
00230                     str( boost::format("Specified delay (%1% ms) out of range (min=%2% ms, max=%3% ms).") 
00231                                           % (delay * simParam.dt.in_ms()) % simParam.minDelay.in_ms() % simParam.maxDelay.in_ms() ) 
00232                 )
00233             );
00234         }
00235 
00236         MultiThreadNetwork::addLocalSpikeMessage( mtTables, localSimEngine, simParam, sender, sender_port, receiver, port, delay );
00237 
00238     } else if (sender.node == mpi_comm.Get_rank()) {
00239 
00240         targetNodesMap->addTargetNode(sender_port, receiver.node, sender.eng);
00241         _nSpikeMessages++;
00242 
00243     } else if (receiver.node == mpi_comm.Get_rank()) {
00244         if( delay < simParam.minDelay.in_steps( get_dt() ) || delay > simParam.maxDelay.in_steps( get_dt() ) ) {
00245             throw(
00246                 PCSIM::ConstructionException(
00247                     "DistributedMultiThreadNetwork::addSpikeMessage",
00248                     str( boost::format("Specified delay (%1% ms) out of range (min=%2% ms, max=%3% ms)." ) % ( delay * simParam.dt.in_ms() ) % simParam.minDelay.in_ms() % simParam.maxDelay.in_ms() )
00249                 )
00250             );
00251         }
00252 
00253         spikegroupid_t tg = (*globalDelayMap)[receiver.eng].find(glengineids[sender.node][sender.eng], sender_port, (delaystep_t)delay);
00254 
00255         if( tg == no_spikegroup ) {
00256             tg = mtTables->stgPool.addSpikeTarget( localSimEngine->getObject(receiver), port );
00257             // cerr << "NODE " << mpi_comm.Get_rank() << " insertdelay(srcnode=" << sender.node << " sed=" << sender_port << ", del=" << delay << ",tg=" << tg <<") type=" << (int)receiver.type << endl;
00258             (*globalDelayMap)[receiver.eng].insert( glengineids[sender.node][sender.eng], sender_port, (delaystep_t)delay, tg );
00259         } else {
00260             mtTables->stgPool.addSpikeTarget( tg, localSimEngine->getObject(receiver), port);
00261         }
00262         _nSpikeMessages++;
00263 
00264     }
00265 
00266 }
00267 
00268 // ------------------------------------ analog messages -------------------------------------- //
00269 template<typename analogSrcType, typename analogDestType>
00270 void DistributedMultiThreadNetwork::addGenericAnalogMessage(const SimObject::ID &sender, analogSrcType srcPortOrField, const SimObject::ID &receiver, analogDestType destPortOrField, delay_t delay)
00271 {
00272 
00273     if (sender.node != receiver.node) {
00274         if (mpi_comm.Get_rank() == sender.node) {
00275             outgoingConnections[receiver.node] = true;
00276         } else if (mpi_comm.Get_rank() == receiver.node) {
00277             incomingConnections[sender.node] = true;
00278         }
00279     }
00280 
00281     if (sender.node == mpi_comm.Get_rank() && receiver.node == mpi_comm.Get_rank()) {
00282         if (sender.eng == receiver.eng) {
00283             stAnalogMsgCreators[sender.eng]->addAnalogMessage(sender,
00284                     srcPortOrField,
00285                     receiver,
00286                     destPortOrField,
00287                     delay);
00288         } else {
00289                 if(  delay < simParam.minDelay.in_steps( simParam.dt ) || delay > simParam.maxDelay.in_steps( simParam.dt ) ) {                 
00290                 throw(
00291                     PCSIM::ConstructionException(
00292                         "DistributedMultiThreadNetwork::addGenericAnalogMessage",
00293                         str( boost::format("Specified delay (%1% ms) out of range (min=%2% ms, max=%3% ms)." ) % (delay * simParam.dt.in_ms()) % simParam.minDelay.in_ms() % simParam.maxDelay.in_ms() )
00294                     )
00295                 );
00296             }
00297             mtAnalogMsgCreator->addAnalogMessage(sender,
00298                                                  srcPortOrField,
00299                                                  receiver,
00300                                                  destPortOrField,
00301                                                  delay);
00302         }
00303     } else if (sender.node == mpi_comm.Get_rank()) {
00304         distAnalogMessageCreators[sender.eng]->addOutgoingAnalogMessage( sender, srcPortOrField, receiver.node);
00305 
00306     } else if (receiver.node == mpi_comm.Get_rank()) {
00307         if ( delay < simParam.minDelay.in_steps( get_dt() ) || delay > simParam.maxDelay.in_steps( get_dt() ) ) {
00308             throw(
00309                 PCSIM::ConstructionException(
00310                     "DistributedMultiThreadNetwork::addGenericAnalogMessage",
00311                     str( boost::format("Specified delay (%1% ms) out of range (min=%2% ms, max=%3% ms).") % ( delay * simParam.dt.in_ms() ) % simParam.minDelay.in_ms() % simParam.maxDelay.in_ms() )
00312                 )
00313             );
00314         }
00315 
00316         distAnalogMessageCreators[receiver.eng]->addIncomingAnalogMessage( sender, srcPortOrField, receiver, destPortOrField, delay);
00317 
00318     }
00319 
00320 }
00321 
00322 
00323 
00324 void DistributedMultiThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, int sender_port, const SimObject::ID &receiver, int recv_port, const Time &delay)
00325 {
00326     addGenericAnalogMessage(sender, (analog_port_id_t)sender_port, receiver, (analog_port_id_t)recv_port, (delay_t)delay.in_steps( simParam.dt));
00327 }
00328 
00329 
00330 void DistributedMultiThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, int sender_port, const SimObject::ID &receiver, string destfield, const Time &delay)
00331 {
00332     addGenericAnalogMessage(sender, (analog_port_id_t)sender_port, receiver, destfield, (delay_t)delay.in_steps( simParam.dt));
00333 }
00334 
00335 
00336 void DistributedMultiThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, string srcfield, const SimObject::ID &receiver, int recv_port, const Time &delay)
00337 {
00338     addGenericAnalogMessage(sender, srcfield, receiver, (analog_port_id_t)recv_port, (delay_t)delay.in_steps( simParam.dt));
00339 }
00340 
00341 
00342 void DistributedMultiThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, string srcfield, const SimObject::ID &receiver, string destfield, const Time &delay)
00343 {
00344     addGenericAnalogMessage(sender, srcfield, receiver, destfield, (delay_t)delay.in_steps( simParam.dt));
00345 }
00346 
00347 // -------------------------- running the simulation messages -------------------------------- //
00348 
00349 // ------------------------------------------------------------------------------------------- //
00350 
00351 gl_engineid_t DistributedMultiThreadNetwork::maxLocalEngineID(void) const
00352 {
00353     return nThreads-1;
00354 }
00355 
00356 
00357 void DistributedMultiThreadNetwork::_initialize_()
00358 {
00359     if (!initialized) {
00360         localSimEngine->initialize();
00361         mpiInBuffers->initialize(simParam.minDelay.in_steps( get_dt() ), 0, MPIBUFFER_BLOCK_SIZE);
00362         mpiOutBuffers->initialize(simParam.minDelay.in_steps( get_dt() ), 0, MPIBUFFER_BLOCK_SIZE);
00363         for (int i = 0 ; i < nThreads; ++i)
00364             distAnalogMsgDispatchers[i]->initialize();
00365         initialized = true;
00366     }
00367 }
00368 
00369 void DistributedMultiThreadNetwork::_reset_()
00370 {
00371     if( ! initialized )
00372         initialize();
00373     spikeScheduler->reset();
00374     distEngine->reset();
00375     for (int i = 0 ; i < nThreads; ++i) {
00376         distAnalogMsgDispatchers[i]->reset(get_dt().in_sec());
00377         mpiOutBuffers->nextCycle();
00378     }
00379     reseted = true;
00380 }
00381 
00382 void DistributedMultiThreadNetwork::_advance_(int nSteps)
00383 {
00384     if( ! reseted )
00385         reset() ;
00386     distEngine->advance( nSteps );
00387 }
00388 
00390 
00391 void DistributedMultiThreadNetwork::noiseRandEngineOutput( vector<uint32> & R )
00392 {
00393     localSimEngine->noiseRandEngineOutput( R );
00394 }
00395 

Generated on Wed Jul 9 16:34:37 2008 for PCSIM by  doxygen 1.5.5