00001 #include "DistributedIncomingAnalogMsgDispatcher.h" 00002 00003 #include <algorithm> 00004 00005 using std::max; 00006 00007 DistributedIncomingAnalogMsgDispatcher::DistributedIncomingAnalogMsgDispatcher(delay_t minimumDelay, MPIInputBuffer *mpiInputBuffer, 00008 DistIncomingAnalogSources2BufPosMap *bufferPositionsOfIncomingSources) 00009 : minDelay(minimumDelay), bufPosOfIncomingSources(bufferPositionsOfIncomingSources), inputBuffer(mpiInputBuffer) 00010 { 00011 initialized = false; 00012 cycled_buf_pos_source_eng_ids = new vector< engineid_t >; 00013 00014 //port_msgs_positions = new buff_pos2msglist_pos_type; 00015 //field_msgs_positions = new buff_pos2msglist_pos_type; 00016 // this should be shared among every dispatcher belonging to the same incoming buffer (from different engines) 00017 // buffPosOfIncomingSources = new DistIncomingAnalogSources2BufPosMap(); 00018 } 00019 00020 DistributedIncomingAnalogMsgDispatcher::~DistributedIncomingAnalogMsgDispatcher() 00021 { 00022 if (!initialized) { 00023 // delete field_msgs_positions; 00024 // delete port_msgs_positions; 00025 delete cycled_buf_pos_source_eng_ids; 00026 } 00027 00028 // delete analog delay objects that are created inside the bufPosOfIncomingSources? 00029 vector< IncomingAnalogDelayObject *>::iterator delayer_it; 00030 for (delayer_it = cycled_inc_msgs_delayer_destinations.begin(); 00031 delayer_it != cycled_inc_msgs_delayer_destinations.end(); 00032 ++delayer_it) { 00033 delete (*delayer_it); 00034 } 00035 00036 } 00037 00038 /* 00039 void DistributedIncomingAnalogMsgDispatcher::dispatchMPIIncomingAnalogMsgs(int step) 00040 { 00041 // dispatch first port messages 00042 port_incoming_analog_msgs_type::const_iterator port_msg_it; 00043 vector< unsigned >::const_iterator port_buff_pos_iter = port_msgs_buff_pos.begin(); 00044 for (port_msg_it = port_analog_msgs.begin() ; port_msg_it != port_analog_msgs.end() ; ++port_msg_it) { 00045 double v = *(mpiBufferPtr + minDelay*(*port_buff_pos_iter) + step); 00046 vector< pair<SimObject *,analog_port_id_t> >::const_iterator port_dest_it; 00047 for (port_dest_it = port_msg_it->begin(); port_dest_it != port_msg_it->end() ; ++port_dest_it) 00048 port_dest_it->first->setAnalogInput(v, port_dest_it->second); 00049 ++port_buff_pos_iter; 00050 } 00051 00052 // then dispatch field messages 00053 field_incoming_analog_msgs_type::iterator field_msg_it; 00054 vector< unsigned >::const_iterator field_buff_pos_iter = field_msgs_buff_pos.begin(); 00055 for (field_msg_it = field_analog_msgs.begin() ; field_msg_it != field_analog_msgs.end() ; ++field_msg_it) { 00056 double v = *(mpiBufferPtr+ minDelay*(*field_buff_pos_iter) + step); 00057 vector< double *>::iterator field_dest_it ; 00058 for (field_dest_it = field_msg_it->begin() ; field_dest_it != field_msg_it->end() ; ++field_dest_it) 00059 *(*field_dest_it) = v; 00060 ++field_buff_pos_iter; 00061 } 00062 } 00063 */ 00064 00065 void DistributedIncomingAnalogMsgDispatcher::dispatchDelayerCycledIncomingAnalogMsgs() 00066 { 00067 vector< unsigned >::const_iterator src_it; 00068 vector< IncomingAnalogDelayObject * >::iterator dest_it; 00069 for (src_it = cycled_inc_msgs_src_buff_pos.begin(), 00070 dest_it = cycled_inc_msgs_delayer_destinations.begin(); 00071 src_it != cycled_inc_msgs_src_buff_pos.end(); 00072 ++src_it, ++dest_it) { 00073 (*dest_it)->putNewCycleValues(mpiBufferPtr + minDelay*(*src_it)); 00074 } 00075 } 00076 00077 00078 void DistributedIncomingAnalogMsgDispatcher::initialize() 00079 { 00080 if (!initialized) { 00081 00082 mpiBufferPtr = inputBuffer->getAnalogBuffer(); 00083 00084 // adjust buffer positions 00085 vector< unsigned >::iterator it; 00086 vector< engineid_t >::iterator src_eng_it = cycled_buf_pos_source_eng_ids->begin(); 00087 for ( it = cycled_inc_msgs_src_buff_pos.begin() ; it != cycled_inc_msgs_src_buff_pos.end(); ++it, ++src_eng_it) 00088 *it += ( *(src_eng_it) ? inputBuffer->getAnalogMsgCounter(*(src_eng_it) - 1) : 0); 00089 00090 00091 delete cycled_buf_pos_source_eng_ids; 00092 00093 // initialize delayers 00094 vector< IncomingAnalogDelayObject *>::iterator delayer_it; 00095 for (delayer_it = cycled_inc_msgs_delayer_destinations.begin(); 00096 delayer_it != cycled_inc_msgs_delayer_destinations.end(); 00097 ++delayer_it) { 00098 (*delayer_it)->initDistributedMode(); 00099 } 00100 00101 //delete field_msgs_positions; 00102 //delete port_msgs_positions; 00103 initialized = true; 00104 } 00105 } 00106 00107 00108 00109 void DistributedIncomingAnalogMsgDispatcher::reset(double dt) 00110 { 00111 // initialize delayers 00112 vector< IncomingAnalogDelayObject *>::iterator delayer_it; 00113 for (delayer_it = cycled_inc_msgs_delayer_destinations.begin(); 00114 delayer_it != cycled_inc_msgs_delayer_destinations.end(); 00115 ++delayer_it) { 00116 (*delayer_it)->reset(dt); 00117 } 00118 }