00001 #include "DistributedSingleThreadNetwork.h"
00002 #include "MPIAllToAllCommunicatorFactory.h"
00003
00004 #include "SingleThreadNetwork.h"
00005 #include "SpikeSender.h"
00006
00007 #include <boost/format.hpp>
00008
00009 #include <iostream>
00010 #include <string>
00011
00012 using std::string;
00013
00014 using std::cout;
00015 using std::cerr;
00016 using std::endl;
00017
00018
00019
00020
00021
00022
00023 DistributedSingleThreadNetwork::DistributedSingleThreadNetwork( SimParameter sp )
00024 : DistributedNetwork(1, MPI::COMM_WORLD, sp )
00025 {
00026 init();
00027 }
00028
00029 DistributedSingleThreadNetwork::DistributedSingleThreadNetwork( MPI::Intracomm &mpiCommunicator, SimParameter sp )
00030 : DistributedNetwork(1, mpiCommunicator, sp )
00031 {
00032 init();
00033 }
00034
00035 void DistributedSingleThreadNetwork::init()
00036 {
00037
00038 spikeBuffer = new PropagatedSpikeBuffer(simParam.minDelay.in_steps( get_dt() ), simParam.maxDelay.in_steps( get_dt() ) );
00039 localDelayMap = new LocalDelayMap;
00040
00041 stgPool = new SpikeTargetGroupPool;
00042
00043 localSpikeScheduler = new SingleThreadSpikeScheduler(*localDelayMap, *stgPool, *spikeBuffer, simParam) ;
00044
00045 targetNodesMap = new TargetNodesMap();
00046 globalDelayMap = new GlobalSingleTargetDelayMap();
00047
00048 mpiOutBuffers = new MPIOutputBufferVector(mpi_comm.Get_size(), glengineids[mpi_comm.Get_rank()]);
00049
00050 mpiInBuffers = new MPIInputBufferVector(glengineids, mpi_comm.Get_size());
00051
00052 spikeScheduler = new DistributedSpikeScheduler(*localSpikeScheduler,
00053 *targetNodesMap,
00054 *mpiOutBuffers);
00055
00056 localAnalogMsgDispatcher = new SingleThreadAnalogMsgDispatcher;
00057
00058
00059 for (int i = 0; i < mpi_comm.Get_size(); ++i) {
00060 sources2BufPositionsMaps.push_back(new DistIncomingAnalogSources2BufPosMap);
00061 incomingDispatchers.push_back(new DistributedIncomingAnalogMsgDispatcher(
00062 (delay_t)simParam.minDelay.in_steps( get_dt() ),
00063 &(*mpiInBuffers)[i],
00064 sources2BufPositionsMaps[i]));
00065
00066 outgoingDispatchers.push_back(new DistributedOutgoingAnalogMsgDispatcher(
00067 0,
00068 (delay_t)simParam.minDelay.in_steps( get_dt() ),
00069 &(*mpiOutBuffers)[i]));
00070 }
00071
00072 distAnalogMsgDispatcher = new DistributedAnalogMessageDispatcher(mpi_comm.Get_size(),
00073 simParam.minDelay.in_steps( get_dt() ),
00074 incomingDispatchers,
00075 outgoingDispatchers,
00076 *localAnalogMsgDispatcher);
00077
00078
00079
00080 SingleThreadSimEngine *ste = new SingleThreadSimEngine(0, *spikeScheduler, *distAnalogMsgDispatcher, *this );
00081 localSimEngine = ste;
00082
00083
00084
00085 delayObjectMap = new AnalogDelayObjectMap;
00086
00087 localAnalogMsgCreator = new STAnalogMessageCreator(
00088 *static_cast<SingleThreadAnalogMsgDispatcher *>(localAnalogMsgDispatcher),
00089 *ste,
00090 *delayObjectMap,
00091 (delay_t)simParam.minDelay.in_steps( get_dt() )) ;
00092
00093
00094 distAnalogMsgCreator = new DistributedAnalogMessageCreator(
00095 *ste,
00096 *distAnalogMsgDispatcher,
00097 (delay_t)simParam.minDelay.in_steps( get_dt() ));
00098
00099 distIncomingSpikeScheduler = new STDistributedIncomingSpikeScheduler(*mpiInBuffers,
00100 *globalDelayMap,
00101 *spikeBuffer,
00102 (delay_t)simParam.minDelay.in_steps( get_dt() ));
00103
00104
00105 MPIAllToAllCommunicatorFactory mpi_comm_factory(MPIAllToAllCommunicatorFactory::Default);
00106
00107 mpiAllToAllComm = mpi_comm_factory.getCommunicator(*mpiInBuffers,
00108 *mpiOutBuffers,
00109 mpi_comm,
00110 incomingConnections,
00111 outgoingConnections);
00112
00113 distEngine = new DistributedSimEngine(*localSimEngine,
00114 *distIncomingSpikeScheduler,
00115 *spikeScheduler,
00116 *distAnalogMsgDispatcher,
00117 *mpiAllToAllComm,
00118 *this );
00119
00120 setupConstructRNGEngines();
00121 DistributedNetwork::seed_noise_rng( makeSeed( simParam.simulationRNGSeed ) );
00122
00123
00124 mpi_comm.Barrier();
00125
00126 }
00127
00128 DistributedSingleThreadNetwork::~DistributedSingleThreadNetwork()
00129 {
00130 delete distEngine;
00131 delete mpiAllToAllComm;
00132 delete distIncomingSpikeScheduler;
00133 delete localAnalogMsgDispatcher;
00134 delete localSimEngine;
00135 delete spikeScheduler;
00136 delete mpiInBuffers;
00137 delete mpiOutBuffers;
00138 delete globalDelayMap;
00139 delete targetNodesMap;
00140 delete localSpikeScheduler;
00141 delete stgPool;
00142 delete localDelayMap;
00143 delete spikeBuffer;
00144 delete localAnalogMsgCreator;
00145 for (int i = 0; i < mpi_comm.Get_size() ; ++i) {
00146 delete incomingDispatchers[i];
00147 delete outgoingDispatchers[i];
00148 delete sources2BufPositionsMaps[i];
00149 }
00150 delete distAnalogMsgDispatcher;
00151 delete distAnalogMsgCreator;
00152 delete delayObjectMap;
00153 }
00154
00155
00156 SimObject * DistributedSingleThreadNetwork::_getObject_(const SimObject::ID &id)
00157 {
00158 if (id.node == mpi_comm.Get_rank()) {
00159 SimObject *p = localSimEngine->getObject(id);
00160 return p;
00161 }
00162 return NULL;
00163 }
00164
00165
00166 void DistributedSingleThreadNetwork::_addSpikeMessage_(const SimObject::ID &sender, const port_t, const spike_port_id_t sender_port, const SimObject::ID &receiver, const port_t port, const step_t delay)
00167 {
00168
00169 if (sender.node != receiver.node) {
00170 if (mpi_comm.Get_rank() == sender.node) {
00171 outgoingConnections[receiver.node] = true;
00172 } else if (mpi_comm.Get_rank() == receiver.node) {
00173 incomingConnections[sender.node] = true;
00174 }
00175 }
00176
00177 if (sender.node == mpi_comm.Get_rank() && receiver.node == mpi_comm.Get_rank()) {
00178
00179 if ( delay > simParam.maxDelay.in_steps( get_dt() ) ) {
00180 throw(
00181 PCSIM::ConstructionException(
00182 "DistributedSingleThreadNetwork::addSpikeMessage",
00183 str( boost::format("Specified delay (%1%) out of range (min=%2% ms, max=%3% ms).") % (delay * simParam.dt.in_ms()) % simParam.minDelay.in_ms() % simParam.maxDelay.in_ms() )
00184 )
00185 );
00186 }
00187
00188 SingleThreadNetwork::addLocalSpikeMessage( localDelayMap, stgPool, localSimEngine, sender_port, receiver, port, delay );
00189
00190 } else if (sender.node == mpi_comm.Get_rank()) {
00191 targetNodesMap->addTargetNode(sender_port, receiver.node );
00192 _nSpikeMessages++;
00193 } else if (receiver.node == mpi_comm.Get_rank()) {
00194
00195 if( delay < simParam.minDelay.in_steps( get_dt() ) || delay > simParam.maxDelay.in_steps( get_dt() ) ) {
00196 throw(
00197 PCSIM::ConstructionException(
00198 "DistributedSingleThreadNetwork::addSpikeMessage",
00199 str( boost::format("Specified delay (%1%) out of range (min=%2% ms, max=%3% ms).") % (delay * simParam.dt.in_ms()) % simParam.minDelay.in_ms() % simParam.maxDelay.in_ms() )
00200 )
00201 );
00202 }
00203
00204 spikegroupid_t tg = globalDelayMap->find(glengineids[sender.node][sender.eng], sender_port, (delaystep_t)delay);
00205
00206 if( tg == no_spikegroup ) {
00207 tg = stgPool->addSpikeTarget( localSimEngine->getObject(receiver), port );
00208 globalDelayMap->insert( glengineids[sender.node][sender.eng], sender_port, (delaystep_t)delay, tg );
00209 } else {
00210 stgPool->addSpikeTarget( tg, localSimEngine->getObject(receiver), port);
00211 }
00212 _nSpikeMessages++;
00213 }
00214
00215 }
00216
00217
00218 template<typename analogSrcType, typename analogDestType>
00219 inline void DistributedSingleThreadNetwork::addGenericAnalogMessage(const SimObject::ID &sender, analogSrcType srcPortOrField, const SimObject::ID &receiver, analogDestType destPortOrField, delay_t delay)
00220 {
00221
00222 if (sender.node != receiver.node) {
00223 if (mpi_comm.Get_rank() == sender.node) {
00224 outgoingConnections[receiver.node] = true;
00225 } else if (mpi_comm.Get_rank() == receiver.node) {
00226 incomingConnections[sender.node] = true;
00227 }
00228 }
00229
00230 if (sender.node == mpi_comm.Get_rank() && receiver.node == mpi_comm.Get_rank()) {
00231
00232 localAnalogMsgCreator->addAnalogMessage( sender, srcPortOrField, receiver, destPortOrField, delay );
00233
00234 } else if (sender.node == mpi_comm.Get_rank()) {
00235 distAnalogMsgCreator->addOutgoingAnalogMessage( sender, srcPortOrField, receiver.node);
00236
00237 } else if (receiver.node == mpi_comm.Get_rank()) {
00238 if ( delay < simParam.minDelay.in_steps( get_dt() ) || delay > simParam.maxDelay.in_steps( get_dt() ) ) {
00239 throw(
00240 PCSIM::ConstructionException(
00241 "DistributedSingleThreadNetwork::addAnalogMessage",
00242 str( boost::format("Specified delay (%1%) out of range (min=%2% ms, max=%3% ms).") % (delay * simParam.dt.in_ms()) % simParam.minDelay.in_ms() % simParam.maxDelay.in_ms() )
00243 )
00244 );
00245 }
00246 distAnalogMsgCreator->addIncomingAnalogMessage( sender, srcPortOrField, receiver, destPortOrField, delay);
00247 }
00248 }
00249
00250 void DistributedSingleThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, int sender_port, const SimObject::ID &receiver, int recv_port, const Time &delay)
00251 {
00252 addGenericAnalogMessage(sender, (analog_port_id_t)sender_port, receiver, (analog_port_id_t)recv_port, (delay_t)delay.in_steps( get_dt() ));
00253 }
00254
00255 void DistributedSingleThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, int sender_port, const SimObject::ID &receiver, string destfield, const Time &delay)
00256 {
00257 addGenericAnalogMessage(sender, (analog_port_id_t)sender_port, receiver, destfield, (delay_t)delay.in_steps( get_dt() ));
00258 }
00259
00260 void DistributedSingleThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, string srcfield, const SimObject::ID &receiver, int recv_port, const Time &delay)
00261 {
00262 addGenericAnalogMessage(sender, srcfield, receiver, (analog_port_id_t)recv_port, (delay_t)delay.in_steps( get_dt() ) );
00263 }
00264
00265 void DistributedSingleThreadNetwork::_addAnalogMessage_(const SimObject::ID &sender, string srcfield, const SimObject::ID &receiver, string destfield, const Time &delay)
00266 {
00267 addGenericAnalogMessage(sender, srcfield, receiver, destfield, (delay_t)delay.in_steps( get_dt() ) );
00268 }
00269
00270
00271
00272
00273 void DistributedSingleThreadNetwork::_initialize_()
00274 {
00275 if (!initialized) {
00276 localSimEngine->initialize();
00277 mpiInBuffers->initialize(simParam.minDelay.in_steps( get_dt() ), 0, MPIBUFFER_BLOCK_SIZE);
00278 mpiOutBuffers->initialize(simParam.minDelay.in_steps( get_dt() ), 0, MPIBUFFER_BLOCK_SIZE);
00279 distAnalogMsgDispatcher->initialize();
00280 initialized = true;
00281 }
00282 }
00283
00284 void DistributedSingleThreadNetwork::_reset_()
00285 {
00286 if ( ! initialized )
00287 initialize();
00288 spikeScheduler->reset();
00289 distEngine->reset();
00290 distAnalogMsgDispatcher->reset(get_dt().in_sec());
00291 reseted = true;
00292 }
00293
00294
00295 void DistributedSingleThreadNetwork::_advance_( int nSteps )
00296 {
00297 if( ! reseted )
00298 reset();
00299 distEngine->advance( nSteps );
00300 }
00301