00001 #include "MultiThreadNetwork.h"
00002 #include "SpikeSender.h"
00003 #include "SingleThreadNetwork.h"
00004 #include "PCSIMException.h"
00005
00006 #include <mpi.h>
00007
00008 #include <string>
00009 using std::string;
00010
00011 #include <boost/format.hpp>
00012
00013
00014
00015 MultiThreadNetwork::MultiThreadNetwork(int nT, SimParameter sp ) :
00016
00017 SimNetwork( MPI::COMM_WORLD, sp, &localRoundRobin ), nThreads(nT), thrPool( nThreads-1 )
00018 {
00019 init();
00020 }
00021
00022 MultiThreadNetwork::MultiThreadNetwork(int nT, MPI::Intracomm &comm, SimParameter sp ) :
00023
00024 SimNetwork( comm, sp, &localRoundRobin ), nThreads(nT), thrPool( nThreads-1 )
00025 {
00026 init();
00027 }
00028
00029 void MultiThreadNetwork::init()
00030 {
00031 if (simParam.minDelay.in_sec() < simParam.dt.in_sec())
00032 throw(
00033 PCSIM::ConstructionException("MultiThreadNetwork::init",
00034 str(boost::format("minDelay (%1%) smaller than dt (%2%)")
00035 % simParam.minDelay.in_sec() % simParam.dt.in_sec() )
00036 )
00037 );
00038
00039 tables = new MTSpikeRoutingTables(nThreads);
00040
00041 for (int i = 0; i < nThreads; ++i) {
00042 PropagatedSpikeBuffer *buf = new PropagatedSpikeBuffer(simParam.minDelay.in_steps( simParam.dt ),
00043 simParam.maxDelay.in_steps( simParam.dt ));
00044 STBuffers.push_back(buf);
00045 }
00046
00047 spikeScheduler = new MultiThreadSpikeScheduler(nThreads, *tables, STBuffers, simParam );
00048
00049 analogDelayObjectsMap = new AnalogDelayObjectMap;
00050
00051 for (int i = 0 ; i < nThreads; ++i)
00052 analogMsgDispatchers.push_back(new MultiThreadAnalogMsgDispatcher( (delay_t)simParam.minDelay.in_steps( simParam.dt) ) );
00053
00054
00055
00056 AnalogMessageDispatcherVectorImpl<MultiThreadAnalogMsgDispatcher>
00057 dispatchersVector(analogMsgDispatchers);
00058
00059 simEngine = new MultiThreadSimEngine(0, nThreads, thrPool, *spikeScheduler,
00060 dispatchersVector,
00061 *this );
00062
00063
00064 for (int i = 0; i < nThreads ; ++i) {
00065 stAnalogMsgCreators.push_back(new STAnalogMessageCreator(analogMsgDispatchers[i]->sTLocalDispatcher(),
00066 simEngine->getSTEngine(i), *analogDelayObjectsMap,
00067 (delay_t)simParam.minDelay.in_steps( simParam.dt) ) );
00068 }
00069
00070 mtAnalogMsgCreator = new MTAnalogMessageCreator(analogMsgDispatchers, *simEngine, *analogDelayObjectsMap,
00071 (delay_t)simParam.minDelay.in_steps( simParam.dt));
00072
00073 initialized = false;
00074
00075 setupConstructRNGEngines();
00076 MultiThreadNetwork::seed_noise_rng( makeSeed( simParam.simulationRNGSeed ) );
00077
00078 }
00079
00080 MultiThreadNetwork::~MultiThreadNetwork()
00081 {
00082 delete simEngine;
00083 delete spikeScheduler;
00084 delete tables;
00085 delete analogDelayObjectsMap;
00086 for (int i = 0; i < nThreads; ++i) {
00087 delete STBuffers[i];
00088 delete stAnalogMsgCreators[i];
00089 delete analogMsgDispatchers[i];
00090 }
00091 delete mtAnalogMsgCreator;
00092 }
00093
00094 void MultiThreadNetwork::seed_noise_rng( uint32 noiseRNGseed )
00095 {
00096 vector<uint32> sim_seeds(nThreads);
00097 fillSeedVector( noiseRNGseed, sim_seeds );
00098 simEngine->seed( sim_seeds );
00099 objectVariationRNDEngine->seed( makeSeed( simParam.constructionRNGSeed ) ) ;
00100 }
00101
00102
00103
00104 void MultiThreadNetwork::_addObject_( const SimObjectFactory &objFactory, const SimEngine::ID &loc, SimObject::ID &id )
00105 {
00106 if( loc.node == _mpi_rank && loc.engine < this->nThreads ) {
00107 id.node = _mpi_rank;
00108 id.eng = loc.engine;
00109 simEngine->addObject( objFactory, id );
00110 } else {
00111 id = SimObject::ID::Invalid;
00112 if( loc.node != _mpi_rank )
00113 throw(
00114 PCSIM::ConstructionException(
00115 "MultiThreadNetwork::addObject",
00116 str( boost::format("Specified node (%1%) not equal to mpi rank (%2%)" ) % loc.node % _mpi_rank )
00117 )
00118 );
00119 else
00120 throw(
00121 PCSIM::ConstructionException(
00122 "MultiThreadNetwork::addObject",
00123 str( boost::format("Specified engine (%1%) out of range (0,%2%)" ) % 0 % this->nThreads )
00124 )
00125 );
00126 }
00127 }
00128
00129 void MultiThreadNetwork::_addObject_( const SimObjectFactory &objFactory, SimObject::ID &id )
00130 {
00131
00132 SimEngine::ID loc = (*distributionStrategy)( this );
00133 return addObject( objFactory, loc, id );
00134 }
00135
00136 void MultiThreadNetwork::_mount_( const SimObjectFactory &objFactory, const SimObject::ID &mountpoint, SimObject::ID &id )
00137 {
00138 if( mountpoint.node == _mpi_rank && mountpoint.eng < this->nThreads ) {
00139 id.node = _mpi_rank;
00140 id.eng = mountpoint.eng;
00141 simEngine->mount( objFactory, mountpoint, id );
00142 } else {
00143 id = SimObject::ID::Invalid;
00144 if( mountpoint.node != _mpi_rank )
00145 throw(
00146 PCSIM::ConstructionException(
00147 "MultiThreadNetwork::mount",
00148 str( boost::format("Specified node (%1%) not equal to mpi rank (%2%)" ) % mountpoint.node % _mpi_rank )
00149 )
00150 );
00151 else
00152 throw(
00153 PCSIM::ConstructionException(
00154 "MultiThreadNetwork::mount",
00155 str( boost::format("Specified engine (%1%) out of range (0,%2%)" ) % 0 % this->nThreads )
00156 )
00157 );
00158 }
00159 }
00160
00161 void MultiThreadNetwork::_insert_( const SimObjectFactory &objFactory, const SimObject::ID &container, SimObject::ID &id )
00162 {
00163 if( container.node == _mpi_rank && container.eng < this->nThreads ) {
00164 id.node = _mpi_rank;
00165 id.eng = container.eng;
00166 simEngine->insert( objFactory, container, id );
00167 } else {
00168 id = SimObject::ID::Invalid;
00169 if( container.node != _mpi_rank )
00170 throw(
00171 PCSIM::ConstructionException(
00172 "MultiThreadNetwork::mount",
00173 str( boost::format("Specified node (%1%) not equal to mpi rank (%2%)") % container.node % _mpi_rank )
00174 )
00175 );
00176 else
00177 throw(
00178 PCSIM::ConstructionException(
00179 "MultiThreadNetwork::mount",
00180 str( boost::format("Specified engine (%1%) out of range (0,%2%)") % 0 % this->nThreads )
00181 )
00182 );
00183 }
00184 }
00185
00186
00187 SimObject * MultiThreadNetwork::_getObject_(const SimObject::ID &id)
00188 {
00189 return simEngine->getObject(id);
00190 }
00191
00192
00193
00194 void MultiThreadNetwork::_connect_( SimObject::ID const& src, port_t out, const SimObject::ID &dst, port_t in, int delay )
00195 {
00196 SimObject *src_obj = simEngine->getObject(src);
00197 SimObject *dst_obj = simEngine->getObject(dst);
00198
00199 Time delay_to_use;
00200 if( delay < 0 ) {
00201 delay_to_use = Time::sec( dst_obj->getManagedDelay() );
00202 } else {
00203 delay_to_use = Time::steps( delay, get_dt() );
00204 }
00205
00206 if( src_obj->outputPortType(out) == SimObject::spiking && dst_obj->inputPortType( in ) == SimObject::spiking ) {
00207 addSpikeMessage( src, out, dst, in, delay_to_use );
00208 } else if ( src_obj->outputPortType(out) == SimObject::analog && dst_obj->inputPortType( in ) == SimObject::analog ) {
00209 addAnalogMessage( src, out, dst, in, delay_to_use );
00210 } else {
00211 throw(
00212 PCSIM::ConstructionException(
00213 "MultiThreadNetwork::_connect_",
00214 str( boost::format("Can not connect specified source (%1%) and destination (%2%) object: no matching out (%3%) and in (%4%) ports.") % src.toString() % dst.toString() % out % in )
00215 )
00216 );
00217 }
00218
00219 }
00220
00221
00222
00223 void MultiThreadNetwork::_addSpikeMessage_(const SimObject::ID &sender, const port_t out, const SimObject::ID &receiver, const port_t in, const Time &delay)
00224 {
00225 if ( ( sender.eng != receiver.eng && delay < simParam.minDelay) || delay > simParam.maxDelay ) {
00226 throw(
00227 PCSIM::ConstructionException(
00228 "MultiThreadNetwork::addSpikeMessage",
00229 str( boost::format("Specified delay (%1%) out of range (min=%2% ms, max=%3% ms)." ) % delay.in_ms() % simParam.minDelay.in_ms() % simParam.maxDelay.in_ms() )
00230 )
00231 );
00232 }
00233 SimObject *src_obj = simEngine->getObject(sender);
00234 SpikeSender *ss = dynamic_cast<SpikeSender*>( src_obj );
00235 if( ss == NULL ) {
00236 throw( PCSIM::ConstructionException( "MultiThreadNetwork::addSpikeMessage",
00237 str( boost::format("Specified simulation object (%1%, %2%) is not a spike sender.") % typeid(*src_obj).name() % sender.toString() ) ) ) ;
00238 }
00239 if( ss->getSpikePort( out ) != NULL ) {
00240 MultiThreadNetwork::addLocalSpikeMessage( tables, simEngine, simParam, sender, ss->getSpikePort( out )->ID(), receiver, in, delay.in_steps( get_dt() ) );
00241 _nSpikeMessages++;
00242 } else {
00243 throw( PCSIM::ConstructionException( "MultiThreadNetwork::addSpikeMessage", str( boost::format("Specified spike output port (%1%) does not exist") % out ) ) );
00244 }
00245 }
00246
00247 void MultiThreadNetwork::addLocalSpikeMessage(
00248 MTSpikeRoutingTables *arg_tables, SimEngine *arg_simEngine, const SimParameter &SP,
00249 const SimObject::ID &sender, spike_port_id_t sender_port, const SimObject::ID &receiver, const port_t in_port, step_t delay )
00250 {
00251 if ( sender.eng == receiver.eng ) {
00252 SingleThreadNetwork::addLocalSpikeMessage( &(arg_tables->localDelayMaps[sender.eng]), &(arg_tables->stgPool), arg_simEngine, sender_port, receiver, in_port, delay );
00253 } else {
00254 if( (step_t)delay < SP.minDelay.in_steps( SP.dt ) ) {
00255 throw( PCSIM::ConstructionException( "MultiThreadNetwork::addSpikeMessage",
00256 str( boost::format("Specified delay (%1%) smaller than minDelay (%2% ms).") % delay % SP.minDelay.in_ms() ) ) );
00257 }
00258 spikegroupid_t tg = arg_tables->mTDelayMap.find(sender.eng, sender_port, receiver.eng, (delaystep_t)delay);
00259 if( tg == no_spikegroup ) {
00260 tg = arg_tables->stgPool.addSpikeTarget( arg_simEngine->getObject( receiver ), in_port);
00261 arg_tables->mTDelayMap.insert( sender.eng, sender_port, receiver.eng, (delaystep_t)delay, tg );
00262 } else {
00263 arg_tables->stgPool.addSpikeTarget( tg, arg_simEngine->getObject( receiver ), in_port);
00264 }
00265
00266 }
00267 }
00268
00269
00270
00271 void MultiThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, int sender_port, const SimObject::ID &receiver, int recv_port, const Time &delay)
00272 {
00273 _nAnalogMessages++;
00274 addGenericAnalogMessage(sender, sender_port, receiver, recv_port, (delay_t)delay.in_steps( get_dt() ) );
00275 }
00276
00277 void MultiThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, int sender_port, const SimObject::ID &receiver, string destfield, const Time &delay)
00278 {
00279 _nAnalogMessages++;
00280 addGenericAnalogMessage(sender, sender_port, receiver, destfield, (delay_t)delay.in_steps( get_dt() ) );
00281 }
00282
00283 void MultiThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, string srcfield, const SimObject::ID &receiver, int recv_port, const Time &delay)
00284 {
00285 _nAnalogMessages++;
00286 addGenericAnalogMessage(sender, srcfield, receiver, recv_port, (delay_t)delay.in_steps( get_dt() ) );
00287 }
00288
00289 void MultiThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, string srcfield, const SimObject::ID &receiver, string destfield, const Time &delay)
00290 {
00291 _nAnalogMessages++;
00292 addGenericAnalogMessage(sender, srcfield, receiver, destfield, (delay_t)delay.in_steps( get_dt() ) );
00293 }
00294
00295
00296
00297 gl_engineid_t MultiThreadNetwork::maxLocalEngineID(void) const
00298 {
00299 return nThreads-1;
00300 }
00301
00302
00303
00304 void MultiThreadNetwork::_initialize_()
00305 {
00306 if (!initialized) {
00307 simEngine->initialize();
00308 vector<MultiThreadAnalogMsgDispatcher *>::const_iterator it;
00309 for ( it = analogMsgDispatchers.begin(); it != analogMsgDispatchers.end(); ++it) {
00310 if (simParam.minDelay.in_steps( simParam.dt ) == 1)
00311 (*it)->setMinDelay(0);
00312 else
00313 (*it)->setMinDelay((delay_t)simParam.minDelay.in_steps(simParam.dt));
00314 }
00315 initialized = true;
00316 }
00317 }
00318
00319 void MultiThreadNetwork::_reset_()
00320 {
00321 if( ! initialized )
00322 initialize();
00323 spikeScheduler->reset();
00324 simEngine->reset();
00325 vector<MultiThreadAnalogMsgDispatcher *>::const_iterator it;
00326 for ( it = analogMsgDispatchers.begin(); it != analogMsgDispatchers.end(); ++it)
00327 (*it)->reset(get_dt().in_sec());
00328 reseted = true;
00329 }
00330
00331 void MultiThreadNetwork::_advance_( int nSteps )
00332 {
00333 if( ! reseted )
00334 reset();
00335 simEngine->advance( nSteps );
00336 }