00001 #include "DistributedMultiThreadNetwork.h"
00002 #include "MPIAllToAllCommunicatorFactory.h"
00003 #include "MultiThreadNetwork.h"
00004 #include "SpikeSender.h"
00005 #include "PCSIMException.h"
00006
00007 #include <iostream>
00008
00009 using std::cout;
00010 using std::cerr;
00011 using std::endl;
00012
00013 #include <string>
00014 using std::string;
00015
00016 #include <boost/format.hpp>
00017
00018
00019
00021 DistributedMultiThreadNetwork::DistributedMultiThreadNetwork(int numThreads, SimParameter sp )
00022 : DistributedNetwork( numThreads, MPI::COMM_WORLD, sp ), nThreads(numThreads), thrPool(numThreads-1)
00023 {
00024 init();
00025 }
00026
00027 DistributedMultiThreadNetwork::DistributedMultiThreadNetwork(int numThreads, MPI::Intracomm &mpiCommunicator, SimParameter sp )
00028 : DistributedNetwork( numThreads, mpiCommunicator, sp ), nThreads(numThreads), thrPool(numThreads-1)
00029 {
00030 init();
00031 }
00032
00033
00034 DistributedMultiThreadNetwork::~DistributedMultiThreadNetwork()
00035 {
00036 delete distEngine;
00037 delete mpiAllToAllComm;
00038 delete distIncomingSpikeScheduler;
00039 delete localSimEngine;
00040 delete spikeScheduler;
00041 delete mpiInBuffers;
00042 delete mpiOutBuffers;
00043 delete globalDelayMap;
00044 delete targetNodesMap;
00045 delete localSpikeScheduler;
00046 for (int i = 0 ; i < nThreads ; ++i) {
00047 delete STBuffers[i];
00048 }
00049 delete mtTables;
00050
00051
00052 for (int i = 0; i < mpi_comm.Get_size(); ++i)
00053 delete sources2BufPositionsMaps[i];
00054
00055 for (int thr = 0; thr < nThreads; ++thr) {
00056 delete mTanalogMsgDispatchers[thr];
00057 delete distAnalogMsgDispatchers[thr];
00058 for (int i = 0; i < mpi_comm.Get_size(); ++i) {
00059 delete incomingDispatchers[thr][i];
00060 delete outgoingDispatchers[thr][i];
00061 }
00062 }
00063
00064 delete mtDistCycledAnalogMsgDispatcher;
00065
00066
00067 delete mtAnalogMsgCreator;
00068 delete analogDelayObjectsMap;
00069 for (int thr = 0; thr < nThreads; ++thr) {
00070 delete stAnalogMsgCreators[thr];
00071 delete distAnalogMessageCreators[thr];
00072 }
00073
00074 }
00075
00076 void DistributedMultiThreadNetwork::init()
00077 {
00078 if (simParam.minDelay.in_sec() < simParam.dt.in_sec())
00079 throw(
00080 PCSIM::ConstructionException("DistributedMultiThreadNetwork::init",
00081 str(boost::format("minDelay (%1%) smaller than dt (%2%)")
00082 % simParam.minDelay.in_sec() % simParam.dt.in_sec() )
00083 )
00084 );
00085
00086 mtTables = new MTSpikeRoutingTables(nThreads);
00087
00088 STBuffers.resize(nThreads);
00089 for (int i = 0 ; i < nThreads ; ++i) {
00090 STBuffers[i] = new PropagatedSpikeBuffer(simParam.minDelay.in_steps( get_dt() ), simParam.maxDelay.in_steps( get_dt() ));
00091 }
00092
00093 localSpikeScheduler = new MultiThreadSpikeScheduler(nThreads, *mtTables, STBuffers, simParam ) ;
00094
00095 targetNodesMap = new TargetNodesMap();
00096 globalDelayMap = new GlobalMultiTargetDelayMap(nThreads);
00097
00098 mpiOutBuffers = new MPIOutputBufferVector(mpi_comm.Get_size(), glengineids[mpi_comm.Get_rank()]);
00099
00100 mpiInBuffers = new MPIInputBufferVector(glengineids, mpi_comm.Get_size());
00101
00102 spikeScheduler = new DistributedSpikeScheduler(*localSpikeScheduler,
00103 *targetNodesMap,
00104 *mpiOutBuffers,
00105 nThreads);
00106
00107
00108
00109 for (int thr = 0 ; thr < nThreads ; ++thr)
00110 mTanalogMsgDispatchers.push_back(new MultiThreadAnalogMsgDispatcher( (delay_t)simParam.minDelay.in_steps( get_dt() ) ) );
00111
00112
00113 incomingDispatchers.resize(nThreads);
00114 outgoingDispatchers.resize(nThreads);
00115
00116 for (int i = 0; i < mpi_comm.Get_size(); ++i)
00117 sources2BufPositionsMaps.push_back(new DistIncomingAnalogSources2BufPosMap);
00118
00119 for (int thr = 0 ; thr < nThreads; ++thr) {
00120 for (int i = 0; i < mpi_comm.Get_size(); ++i) {
00121 incomingDispatchers[thr].push_back(new DistributedIncomingAnalogMsgDispatcher(
00122 (delay_t)simParam.minDelay.in_steps( get_dt() ),
00123 &(*mpiInBuffers)[i],
00124 sources2BufPositionsMaps[i]));
00125
00126 outgoingDispatchers[thr].push_back(new DistributedOutgoingAnalogMsgDispatcher(
00127 0,
00128 (delay_t)simParam.minDelay.in_steps( get_dt() ),
00129 &(*mpiOutBuffers)[i]));
00130 }
00131
00132 distAnalogMsgDispatchers.push_back(new DistributedAnalogMessageDispatcher(mpi_comm.Get_size(),
00133 (delay_t)simParam.minDelay.in_steps( get_dt() ),
00134 incomingDispatchers[thr],
00135 outgoingDispatchers[thr],
00136 *mTanalogMsgDispatchers[thr]));
00137 }
00138
00139 mtDistCycledAnalogMsgDispatcher = new MTDistributedCycledAnalogMsgDispatcher(thrPool,
00140 distAnalogMsgDispatchers);
00141
00142
00143 AnalogMessageDispatcherVectorImpl<DistributedAnalogMessageDispatcher> dispatcherVector(distAnalogMsgDispatchers);
00144
00145 localSimEngine = new MultiThreadSimEngine(0, nThreads, thrPool, *spikeScheduler, dispatcherVector, *this );
00146
00147
00148
00149 analogDelayObjectsMap = new AnalogDelayObjectMap;
00150
00151 for (int i = 0; i < nThreads ; ++i) {
00152 stAnalogMsgCreators.push_back(new STAnalogMessageCreator(
00153 mTanalogMsgDispatchers[i]->sTLocalDispatcher(),
00154 (dynamic_cast<MultiThreadSimEngine *>(localSimEngine))->getSTEngine(i),
00155 *analogDelayObjectsMap,
00156 (delay_t)simParam.minDelay.in_steps( simParam.dt) ) );
00157 }
00158
00159 mtAnalogMsgCreator = new MTAnalogMessageCreator(mTanalogMsgDispatchers,
00160 *dynamic_cast<MultiThreadSimEngine *>(localSimEngine),
00161 *analogDelayObjectsMap,
00162 (delay_t)simParam.minDelay.in_steps( simParam.dt));
00163
00164 for (int i = 0; i < nThreads ; ++i) {
00165 distAnalogMessageCreators.push_back( new DistributedAnalogMessageCreator(
00166 (dynamic_cast<MultiThreadSimEngine *>(localSimEngine))->getSTEngine(i),
00167 *distAnalogMsgDispatchers[i],
00168 (delay_t)simParam.minDelay.in_steps( get_dt() )) );
00169
00170 }
00171
00172
00173
00174 distIncomingSpikeScheduler = new MTDistributedIncomingSpikeScheduler(nThreads,
00175 thrPool,
00176 *mpiInBuffers,
00177 *globalDelayMap,
00178 STBuffers,
00179 (delay_t)simParam.minDelay.in_steps( get_dt() ));
00180
00181 MPIAllToAllCommunicatorFactory mpi_comm_factory(MPIAllToAllCommunicatorFactory::Default);
00182
00183 mpiAllToAllComm = mpi_comm_factory.getCommunicator(*mpiInBuffers,
00184 *mpiOutBuffers,
00185 mpi_comm,
00186 incomingConnections,
00187 outgoingConnections);
00188
00189
00190 distEngine = new DistributedSimEngine(*localSimEngine,
00191 *distIncomingSpikeScheduler,
00192 *spikeScheduler,
00193 *mtDistCycledAnalogMsgDispatcher,
00194 *mpiAllToAllComm,
00195 *this );
00196
00197 setupConstructRNGEngines();
00198 DistributedNetwork::seed_noise_rng( makeSeed( simParam.simulationRNGSeed ) );
00199
00200
00201 mpi_comm.Barrier();
00202
00203 }
00204
00205
00206 SimObject * DistributedMultiThreadNetwork::_getObject_(const SimObject::ID &id)
00207 {
00208 if (id.node == mpi_comm.Get_rank())
00209 return localSimEngine->getObject(id);
00210 return NULL;
00211 }
00212
00213 void DistributedMultiThreadNetwork::_addSpikeMessage_(const SimObject::ID &sender, const port_t, const spike_port_id_t sender_port, const SimObject::ID &receiver, const port_t port, const step_t delay)
00214 {
00215
00216 if (sender.node != receiver.node) {
00217 if (mpi_comm.Get_rank() == sender.node) {
00218 outgoingConnections[receiver.node] = true;
00219 } else if (mpi_comm.Get_rank() == receiver.node) {
00220 incomingConnections[sender.node] = true;
00221 }
00222 }
00223
00224 if (sender.node == mpi_comm.Get_rank() && receiver.node == mpi_comm.Get_rank()) {
00225
00226 if( (!sender.isInTheSameSTEngineWith(receiver) && delay < simParam.minDelay.in_steps( get_dt() ) ) || delay > simParam.maxDelay.in_steps( get_dt() ) ) {
00227 throw(
00228 PCSIM::ConstructionException(
00229 "DistributedMultiThreadNetwork::addSpikeMessage",
00230 str( boost::format("Specified delay (%1% ms) out of range (min=%2% ms, max=%3% ms).")
00231 % (delay * simParam.dt.in_ms()) % simParam.minDelay.in_ms() % simParam.maxDelay.in_ms() )
00232 )
00233 );
00234 }
00235
00236 MultiThreadNetwork::addLocalSpikeMessage( mtTables, localSimEngine, simParam, sender, sender_port, receiver, port, delay );
00237
00238 } else if (sender.node == mpi_comm.Get_rank()) {
00239
00240 targetNodesMap->addTargetNode(sender_port, receiver.node, sender.eng);
00241 _nSpikeMessages++;
00242
00243 } else if (receiver.node == mpi_comm.Get_rank()) {
00244 if( delay < simParam.minDelay.in_steps( get_dt() ) || delay > simParam.maxDelay.in_steps( get_dt() ) ) {
00245 throw(
00246 PCSIM::ConstructionException(
00247 "DistributedMultiThreadNetwork::addSpikeMessage",
00248 str( boost::format("Specified delay (%1% ms) out of range (min=%2% ms, max=%3% ms)." ) % ( delay * simParam.dt.in_ms() ) % simParam.minDelay.in_ms() % simParam.maxDelay.in_ms() )
00249 )
00250 );
00251 }
00252
00253 spikegroupid_t tg = (*globalDelayMap)[receiver.eng].find(glengineids[sender.node][sender.eng], sender_port, (delaystep_t)delay);
00254
00255 if( tg == no_spikegroup ) {
00256 tg = mtTables->stgPool.addSpikeTarget( localSimEngine->getObject(receiver), port );
00257
00258 (*globalDelayMap)[receiver.eng].insert( glengineids[sender.node][sender.eng], sender_port, (delaystep_t)delay, tg );
00259 } else {
00260 mtTables->stgPool.addSpikeTarget( tg, localSimEngine->getObject(receiver), port);
00261 }
00262 _nSpikeMessages++;
00263
00264 }
00265
00266 }
00267
00268
00269 template<typename analogSrcType, typename analogDestType>
00270 void DistributedMultiThreadNetwork::addGenericAnalogMessage(const SimObject::ID &sender, analogSrcType srcPortOrField, const SimObject::ID &receiver, analogDestType destPortOrField, delay_t delay)
00271 {
00272
00273 if (sender.node != receiver.node) {
00274 if (mpi_comm.Get_rank() == sender.node) {
00275 outgoingConnections[receiver.node] = true;
00276 } else if (mpi_comm.Get_rank() == receiver.node) {
00277 incomingConnections[sender.node] = true;
00278 }
00279 }
00280
00281 if (sender.node == mpi_comm.Get_rank() && receiver.node == mpi_comm.Get_rank()) {
00282 if (sender.eng == receiver.eng) {
00283 stAnalogMsgCreators[sender.eng]->addAnalogMessage(sender,
00284 srcPortOrField,
00285 receiver,
00286 destPortOrField,
00287 delay);
00288 } else {
00289 if( delay < simParam.minDelay.in_steps( simParam.dt ) || delay > simParam.maxDelay.in_steps( simParam.dt ) ) {
00290 throw(
00291 PCSIM::ConstructionException(
00292 "DistributedMultiThreadNetwork::addGenericAnalogMessage",
00293 str( boost::format("Specified delay (%1% ms) out of range (min=%2% ms, max=%3% ms)." ) % (delay * simParam.dt.in_ms()) % simParam.minDelay.in_ms() % simParam.maxDelay.in_ms() )
00294 )
00295 );
00296 }
00297 mtAnalogMsgCreator->addAnalogMessage(sender,
00298 srcPortOrField,
00299 receiver,
00300 destPortOrField,
00301 delay);
00302 }
00303 } else if (sender.node == mpi_comm.Get_rank()) {
00304 distAnalogMessageCreators[sender.eng]->addOutgoingAnalogMessage( sender, srcPortOrField, receiver.node);
00305
00306 } else if (receiver.node == mpi_comm.Get_rank()) {
00307 if ( delay < simParam.minDelay.in_steps( get_dt() ) || delay > simParam.maxDelay.in_steps( get_dt() ) ) {
00308 throw(
00309 PCSIM::ConstructionException(
00310 "DistributedMultiThreadNetwork::addGenericAnalogMessage",
00311 str( boost::format("Specified delay (%1% ms) out of range (min=%2% ms, max=%3% ms).") % ( delay * simParam.dt.in_ms() ) % simParam.minDelay.in_ms() % simParam.maxDelay.in_ms() )
00312 )
00313 );
00314 }
00315
00316 distAnalogMessageCreators[receiver.eng]->addIncomingAnalogMessage( sender, srcPortOrField, receiver, destPortOrField, delay);
00317
00318 }
00319
00320 }
00321
00322
00323
00324 void DistributedMultiThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, int sender_port, const SimObject::ID &receiver, int recv_port, const Time &delay)
00325 {
00326 addGenericAnalogMessage(sender, (analog_port_id_t)sender_port, receiver, (analog_port_id_t)recv_port, (delay_t)delay.in_steps( simParam.dt));
00327 }
00328
00329
00330 void DistributedMultiThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, int sender_port, const SimObject::ID &receiver, string destfield, const Time &delay)
00331 {
00332 addGenericAnalogMessage(sender, (analog_port_id_t)sender_port, receiver, destfield, (delay_t)delay.in_steps( simParam.dt));
00333 }
00334
00335
00336 void DistributedMultiThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, string srcfield, const SimObject::ID &receiver, int recv_port, const Time &delay)
00337 {
00338 addGenericAnalogMessage(sender, srcfield, receiver, (analog_port_id_t)recv_port, (delay_t)delay.in_steps( simParam.dt));
00339 }
00340
00341
00342 void DistributedMultiThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, string srcfield, const SimObject::ID &receiver, string destfield, const Time &delay)
00343 {
00344 addGenericAnalogMessage(sender, srcfield, receiver, destfield, (delay_t)delay.in_steps( simParam.dt));
00345 }
00346
00347
00348
00349
00350
00351 gl_engineid_t DistributedMultiThreadNetwork::maxLocalEngineID(void) const
00352 {
00353 return nThreads-1;
00354 }
00355
00356
00357 void DistributedMultiThreadNetwork::_initialize_()
00358 {
00359 if (!initialized) {
00360 localSimEngine->initialize();
00361 mpiInBuffers->initialize(simParam.minDelay.in_steps( get_dt() ), 0, MPIBUFFER_BLOCK_SIZE);
00362 mpiOutBuffers->initialize(simParam.minDelay.in_steps( get_dt() ), 0, MPIBUFFER_BLOCK_SIZE);
00363 for (int i = 0 ; i < nThreads; ++i)
00364 distAnalogMsgDispatchers[i]->initialize();
00365 initialized = true;
00366 }
00367 }
00368
00369 void DistributedMultiThreadNetwork::_reset_()
00370 {
00371 if( ! initialized )
00372 initialize();
00373 spikeScheduler->reset();
00374 distEngine->reset();
00375 for (int i = 0 ; i < nThreads; ++i) {
00376 distAnalogMsgDispatchers[i]->reset(get_dt().in_sec());
00377 mpiOutBuffers->nextCycle();
00378 }
00379 reseted = true;
00380 }
00381
00382 void DistributedMultiThreadNetwork::_advance_(int nSteps)
00383 {
00384 if( ! reseted )
00385 reset() ;
00386 distEngine->advance( nSteps );
00387 }
00388
00390
00391 void DistributedMultiThreadNetwork::noiseRandEngineOutput( vector<uint32> & R )
00392 {
00393 localSimEngine->noiseRandEngineOutput( R );
00394 }
00395