00001 #ifndef MPIOUTPUTBUFFER_H_
00002 #define MPIOUTPUTBUFFER_H_
00003
00004
00005 #include <numeric>
00006
00007 using std::accumulate;
00008 using std::partial_sum;
00009
00010 #include <mpi.h>
00011
00012 #include "globaldefinitions.h"
00013 #include "MPIBufferSlicer.h"
00014 #include "MPIOutputSpikeBuffer.h"
00015 #include "MPIExchangeBlocksInfo.h"
00016
00017 using std::cerr;
00018
00019 class MPIOutputBufferVector;
00020
00021 class MPIOutputBuffer
00022 {
00023 public:
00024 MPIOutputBuffer(vector<gl_engineid_t> &engIDs);
00025
00026 virtual ~MPIOutputBuffer();
00027
00028 double *getAnalogBuffer()
00029 {
00030 return (double *)analog_buf;
00031 }
00032
00033 unsigned & getAnalogMsgCounter(engineid_t eng = 0)
00034 {
00035 return analogMsgCounters[eng];
00036 }
00037
00038 void calculateTotalAnalogMsgCounter()
00039 {
00040 totalAnalogMsgCounter = accumulate(analogMsgCounters.begin(), analogMsgCounters.end(), 0);
00041 partial_sum(analogMsgCounters.begin(), analogMsgCounters.end(), analogMsgCounters.begin());
00042 }
00043
00044 void startNewMPIExchange();
00045
00046 bool hasNextBufferSlice();
00047
00048 MPIMessageSpec & prepareNextBufferSlice();
00049
00050 void *getBuffer();
00051
00052 void setFinishedFlag(bool finished);
00053
00054 void nextCycle();
00055
00056 MPIOutputSpikeBuffer<> &spikeBuf()
00057 {
00058 return mpiOutputSpikeBuffer;
00059 };
00060
00061
00062 MPIOutputSpikeBuffer<> & mpiOutputSpikingBuffer();
00063
00064 MPIMessageSpec &getCurrentMPIMsgSpecHolder();
00065
00066
00067 int getMixedCounts(int idx)
00068 {
00069 return mixedCounts[idx];
00070 }
00071
00072
00073 int getMixedDispl(int idx)
00074 {
00075 return mixedDisplacements[idx];
00076 }
00077
00078 void getMPIExchangeBlocksInfo();
00079
00080 protected:
00081 bool initialized;
00082
00083 void *analog_buf;
00084
00085 void *spike_buf;
00086
00087 void *baseBufferPtr;
00088
00089 MPI::Datatype mixedMPIDataType;
00090
00091 MPI::Datatype spikingMPIDatatype;
00092
00093 MPI::Datatype analogMPIDatatype;
00094
00095 MPIMessageSpec currentMsgInfo;
00096
00097 MPI::Datatype mixed_data_types[2];
00098
00099 void initialize(MPIMessageSpec msgSpec, size_t spikeBufferSize, size_t maxMPIMsgSize,
00100 void *baseBufferPtr, void *analogBuffer, void *spikeBuffer);
00101
00102
00103 int mixedCounts[2];
00104
00105
00106 MPI::Aint mixedDisplacements[2];
00107
00108 size_t spike_buffer_size_elements;
00109
00110 vector<unsigned> analogMsgCounters;
00111
00112 unsigned totalAnalogMsgCounter;
00113
00114 MPIOutputSpikeBuffer<> mpiOutputSpikeBuffer;
00115
00116 MPIBufferSlicer slicer;
00117
00118 friend class MPIOutputBufferVector;
00119 };
00120
00121
00123
00131 class MPIOutputBufferVector
00132 {
00133 public:
00134
00142 MPIOutputBufferVector(int numBuffers, vector<gl_engineid_t> &engIDs);
00143
00144 virtual ~MPIOutputBufferVector();
00145
00146
00147 void initialize(int minDelay, size_t maxMPIMessageSize = 0,
00148 size_t spikeBufferSize = MPIBUFFER_BLOCK_SIZE);
00149
00151 void *getBaseBufferPtr()
00152 {
00153 return (void *)memoryPool;
00154 }
00155
00157 void nextCycle()
00158 {
00159 for (int i = 0; i < nNodes; ++i) {
00160 _buffers[i].nextCycle();
00161 }
00162 }
00163
00165 void setFinishedFlag(bool flag)
00166 {
00167 for (int i = 0; i < nNodes; ++i) {
00168 _buffers[i].setFinishedFlag(flag);
00169 }
00170 }
00171
00173 MPIOutputBuffer & operator[] (int idx)
00174 {
00175 return _buffers[idx];
00176 }
00177
00178 void startNewMPIExchange();
00179
00180 void prepareNextBufferSlices();
00181
00182 MPIExchangeBlocksInfo & getMPIExchangeBlocksInfo();
00183
00184 private:
00185 bool initialized;
00186
00188 int nNodes;
00189
00191 size_t spike_buffer_size;
00192
00194 vector<MPIOutputBuffer> _buffers;
00195
00197
00200 char *memoryPool;
00201
00202 MPIExchangeBlocksInfo mpiExchBlocksInfo;
00203 };
00204
00205 #endif