00001 #include "DistributedNetwork.h"
00002 #include "PCSIMException.h"
00003 #include "SpikeSender.h"
00004 #include "SimTime.h"
00005 #include "SimObjectRegistry.h"
00006
00007 #include <string>
00008 using std::string;
00009
00010 #include <boost/format.hpp>
00011
00012 #include <mpi.h>
00013
00014 #include <iostream>
00015 using std::cerr;
00016 using std::endl;
00017
00018 DistributedNetwork::DistributedNetwork( int nThreads, SimParameter sp ) :
00019 SimNetwork(MPI::COMM_WORLD, sp, &globalRoundRobin ), mpi_comm(MPI::COMM_WORLD), _nThreads(nThreads)
00020 {
00021 init();
00022 }
00023
00024 DistributedNetwork::DistributedNetwork( int nThreads, MPI::Intracomm &mpiCommunicator, SimParameter sp ) :
00025 SimNetwork(mpiCommunicator, sp, &globalRoundRobin ), mpi_comm(mpiCommunicator), _nThreads(nThreads)
00026 {
00027 init();
00028 }
00029
00030
00031
00032 void DistributedNetwork::init()
00033 {
00034 setupGlEngineIDs(_nThreads);
00035 incomingConnections.resize(mpi_comm.Get_size(), false);
00036 outgoingConnections.resize(mpi_comm.Get_size(), false);
00037 objectCounter.resize( maxGlobalEngineID()+1 );
00038 spikePortCounter.resize( maxGlobalEngineID()+1 );
00039 for( size_t i=0; i<spikePortCounter.size(); i++)
00040 spikePortCounter[0] = 0;
00041
00042 ghostFirstPortInfo.resize( maxGlobalEngineID()+1 );
00043 _mpi_rank = mpi_comm.Get_rank();
00044
00045 }
00046
00047 DistributedNetwork::~DistributedNetwork()
00048 {
00049
00050 }
00051
00052 uint32 DistributedNetwork::getUniqueSeedOverMpi( uint32 seed )
00053 {
00054 uint32 seed_buffer = seed;
00055 mpi_comm.Bcast((void *)&seed_buffer, (int)1, MPI::UNSIGNED, 0);
00056 return seed_buffer;
00057 }
00058
00059 void DistributedNetwork::seed_noise_rng( uint32 noiseRNGseed )
00060 {
00061 uint32 seed = getUniqueSeedOverMpi( noiseRNGseed );
00062 vector<uint32> global_sim_seeds( maxGlobalEngineID()+1 );
00063 fillSeedVector( seed, global_sim_seeds );
00064
00065 vector<uint32> local_sim_seeds( _nThreads );
00066 for( int e = 0; e < _nThreads; e++ ) {
00067 local_sim_seeds[e] = global_sim_seeds[ glengineids[ _mpi_rank ][ e ] ];
00068 }
00069 localSimEngine->seed( local_sim_seeds );
00070
00071
00072 seed = getUniqueSeedOverMpi( makeSeed( simParam.constructionRNGSeed ) );
00073 global_sim_seeds.resize( mpi_size() + 1 );
00074 fillSeedVector( seed, global_sim_seeds );
00075 objectVariationRNDEngine->seed( global_sim_seeds[ mpi_rank() ] );
00076
00077 }
00078
00079 void DistributedNetwork::setupGlEngineIDs(unsigned int numLocalEngines)
00080 {
00081 uint32 *nEngineArray = new uint32[mpi_comm.Get_size()];
00082
00083 uint32 nEngines = numLocalEngines;
00084 mpi_comm.Allgather((void*)&nEngines, (int)1, MPI::UNSIGNED, (void*)nEngineArray, (int)1, MPI::UNSIGNED);
00085 int glEngineCounter = 0;
00086 glengineids.resize(mpi_comm.Get_size());
00087 for (int i=0 ; i < mpi_comm.Get_size(); ++i) {
00088 glengineids[i].resize(nEngineArray[i]);
00089 for (unsigned int j = 0 ; j < nEngineArray[i] ; ++j) {
00090 glengineids[i][j] = glEngineCounter;
00091 location.push_back( SimEngine::ID( i, j ) );
00092 glEngineCounter++;
00093 }
00094 }
00095
00096 _max_global_engine_id = glEngineCounter-1;
00097 delete [] nEngineArray;
00098 }
00099
00100
00101
00103 gl_engineid_t DistributedNetwork::getGlobalEngineID( const SimEngine::ID &eid ) const
00104 {
00105 if( ( eid.node < glengineids.size() ) && ( eid.engine < glengineids[ eid.node ].size() ) ) {
00106 return glengineids[ eid.node ][ eid.engine ];
00107 } else {
00108 return INVALID_GLOBAL_ENGINE_ID;
00109 }
00110 }
00111
00113 gl_engineid_t DistributedNetwork::getGlobalEngineID( engineid_t eng ) const
00114 {
00115 return getGlobalEngineID( SimEngine::ID( mpi_comm.Get_rank(), eng) );
00116 }
00117
00118
00120 gl_engineid_t DistributedNetwork::maxGlobalEngineID(void) const
00121 {
00122 return _max_global_engine_id;
00123 }
00124
00125 gl_engineid_t DistributedNetwork::maxLocalEngineID(void) const
00126 {
00127 return _nThreads-1;
00128 }
00129
00131 const SimEngine::ID DistributedNetwork::getLocation( gl_engineid_t gEID ) const
00132 {
00133 if( gEID < location.size() ) {
00134 return location[gEID];
00135 } else {
00136 return SimEngine::ID::Invalid;
00137 }
00138 }
00139
00140 local_objectid_t DistributedNetwork::getGhostID( gl_engineid_t gEID, SimObjectFactory const& objFactory )
00141 {
00142 object_type_t tid = objFactory.getObjectTypeID();
00143 if( gEID >= objectCounter.size() ) {
00144 return INVALID_LOCAL_OBJECT_ID;
00145 }
00146
00147
00148 if( tid >= objectCounter[gEID].size() ) {
00149 objectCounter[gEID].resize( tid + 1 );
00150 objectCounter[gEID][tid] = 0;
00151
00152 ghostFirstPortInfo[gEID].resize( tid + 1 );
00153 }
00154
00155
00156 local_objectid_t oid = objectCounter[gEID][tid]++;
00157
00158 int nsop = theSimObjectRegistry->getObject( objFactory.getObjectTypeID() ).nSpikeOutputPorts();
00159 if( nsop > 0 ) {
00160 if( oid >= ghostFirstPortInfo[gEID][tid].size() ) {
00161 ghostFirstPortInfo[gEID][tid].resize( oid + 1 );
00162 }
00163 ghostFirstPortInfo[gEID][tid][oid] = spikePortCounter[gEID];
00164 } else {
00165
00166 }
00167
00168 spikePortCounter[gEID] += nsop;
00169
00170 return oid;
00171
00172 }
00173
00174
00175
00176
00177 void DistributedNetwork::_addObject_( const SimObjectFactory &objFactory, const SimEngine::ID &loc, SimObject::ID &id )
00178 {
00179 gl_engineid_t gEID = getGlobalEngineID( loc );
00180
00181 if( gEID != INVALID_GLOBAL_ENGINE_ID ) {
00182 id.node = loc.node;
00183 id.eng = loc.engine;
00184 if( loc.node == mpi_comm.Get_rank() ) {
00185 localSimEngine->addObject( objFactory, id );
00186 } else {
00187 id.type = objFactory.getObjectTypeID();
00188 id.localid = getGhostID( gEID, objFactory );
00189 }
00190 } else {
00191 id = SimObject::ID::Invalid;
00192 throw( PCSIM::ConstructionException( "DistributedSingleThreadNetwork::addObject", "Invalid sim engine id " + loc.toString() + " specified." ) );
00193 }
00194 }
00195
00196 void DistributedNetwork::_mount_( const SimObjectFactory &objFactory, const SimObject::ID &mountpoint, SimObject::ID &id )
00197 {
00198 SimEngine::ID loc( mountpoint.node, mountpoint.eng );
00199 gl_engineid_t gEID = getGlobalEngineID( loc );
00200 if( gEID != INVALID_GLOBAL_ENGINE_ID ) {
00201 id.node = loc.node;
00202 id.eng = loc.engine;
00203 if( loc.node == mpi_comm.Get_rank() ) {
00204 localSimEngine->mount( objFactory, mountpoint, id );
00205 } else {
00206 id.type = objFactory.getObjectTypeID();
00207 id.localid = getGhostID( gEID, objFactory );
00208 }
00209 } else {
00210 id = SimObject::ID::Invalid;
00211 throw( PCSIM::ConstructionException( "DistributedSingleThreadNetwork::mount", "Invalid mountpoint id " + mountpoint.toString() + " specified." ) );
00212 }
00213 }
00214
00215 void DistributedNetwork::_insert_( const SimObjectFactory &objFactory, const SimObject::ID &container, SimObject::ID &id )
00216 {
00217 SimEngine::ID loc( container.node, container.eng );
00218 gl_engineid_t gEID = getGlobalEngineID( loc );
00219 if( gEID != INVALID_GLOBAL_ENGINE_ID ) {
00220 id.node = loc.node;
00221 id.eng = loc.engine;
00222 if( loc.node == mpi_comm.Get_rank() ) {
00223 localSimEngine->insert( objFactory, container, id );
00224 } else {
00225 id.type = objFactory.getObjectTypeID();
00226 id.localid = getGhostID( gEID, objFactory );
00227 }
00228 } else {
00229 id = SimObject::ID::Invalid;
00230 throw( PCSIM::ConstructionException( "DistributedSingleThreadNetwork::insert", "Invalid container id " + container.toString() + " specified." ) );
00231 }
00232 }
00233
00234
00235
00236 void DistributedNetwork::_connect_( SimObject::ID const& sender, port_t out, const SimObject::ID &receiver, port_t in, int delay )
00237 {
00238 int delay_to_use = 0 ;
00239 if( mpi_comm.Get_rank() == receiver.node ) {
00240 SimObject *rec_obj = localSimEngine->getObject(receiver);
00241 delay_to_use = delay < 0 ? (int)( rec_obj->getManagedDelay() / get_dt().in_sec() + 0.5 ) : delay ;
00242 }
00243
00244 if( mpi_comm.Get_rank() == sender.node || mpi_comm.Get_rank() == receiver.node ) {
00245
00246 SimObject::PortType sender_port_type = (theSimObjectRegistry->getObject( sender.type )).outputPortType( out );
00247 SimObject::PortType receiver_port_type = (theSimObjectRegistry->getObject( receiver.type )).inputPortType( in );
00248
00249 if( sender_port_type == SimObject::spiking && receiver_port_type == SimObject::spiking ) {
00250 _addSpikeMessage_( sender, out, getFirstSenderSpikePort( sender ) + out, receiver, in, delay_to_use );
00251 } else if( sender_port_type == SimObject::analog && receiver_port_type == SimObject::analog ) {
00252 addAnalogMessage( sender, out, receiver, in, Time::steps( delay_to_use, get_dt() ) );
00253 } else {
00254 throw(
00255 PCSIM::ConstructionException(
00256 "DistributedNetwork::_connect_",
00257 str( boost::format("Can not connect specified source (%1%) and destination (%2%) object: no matching output (%3%) and input (%4%) ports.") % sender.toString() % receiver.toString() % out % in)
00258 )
00259 );
00260 }
00261 }
00262 }
00263
00264
00265
00266 void DistributedNetwork::_addSpikeMessage_(const SimObject::ID &sender, const port_t out, const SimObject::ID &receiver, const port_t in, const Time &delay)
00267 {
00268 SimObject::PortType sender_port_type = (theSimObjectRegistry->getObject( sender.type )).outputPortType( out );
00269 SimObject::PortType receiver_port_type = (theSimObjectRegistry->getObject( receiver.type )).inputPortType( in );
00270 if( sender_port_type == SimObject::spiking && receiver_port_type == SimObject::spiking ) {
00271 _addSpikeMessage_( sender, out, getFirstSenderSpikePort(sender)+ out, receiver, in, delay.in_steps( get_dt() ) );
00272 } else {
00273 throw(
00274 PCSIM::ConstructionException(
00275 "DistributedSingleThreadNetwork::_connect_",
00276 str( boost::format("Can not add spike message from source (%1%) to destination (%2%) object: no matching out (%3%) and in (%4%) ports.") % sender.toString() % receiver.toString() % out % in)
00277 )
00278 );
00279 }
00280 }
00281
00282
00283 ostream& operator<<(ostream &s, const DistributedNetwork::ConnectInfo &ci)
00284 {
00285 return s << "("
00286
00287 << "sp=" << (int)ci.first_spike_output_port_id << ", "
00288 << "si=" << (int)ci.nSpikeInputPorts << ", "
00289 << "so=" << (int)ci.nSpikeOutputPorts << ", "
00290 << "ai=" << (int)ci.nAnalogInputPorts << ", "
00291 << "ao=" << (int)ci.nAnalogOutputPorts << ")";
00292 }
00293
00294 spike_port_id_t DistributedNetwork::getFirstSenderSpikePort( const SimObject::ID &oid )
00295 {
00296 if ( oid.node == mpi_comm.Get_rank() ) {
00297 SimObject *src_obj = localSimEngine->getObject(oid);
00298
00299
00300
00301
00302
00303 SpikeSender *ss = dynamic_cast<SpikeSender*>( src_obj );
00304 if( ss != NULL && ss->getSpikePort( 0 ) != NULL ) {
00305 return ss->getSpikePort( 0 )->ID();
00306 } else {
00307 return INVALID_SPIKE_PORT_ID;
00308 }
00309 } else {
00310 SimEngine::ID loc( oid.node, oid.eng );
00311 gl_engineid_t gEID = getGlobalEngineID( loc );
00312 return ghostFirstPortInfo[gEID][oid.type][oid.localid];
00313 }
00314 }