00001 #include "MPIOutputBuffer.h"
00002
00003 #include <cassert>
00004
00005 #include <cstring>
00006
00007
00008
00009 MPIOutputBuffer::MPIOutputBuffer(vector<gl_engineid_t> &engIDs)
00010 : initialized(false), mpiOutputSpikeBuffer(engIDs), slicer(MPIBufferSlicer::tpOutputBuffer)
00011 {
00012 analogMsgCounters.resize(engIDs.size(), 0);
00013 }
00014
00015
00016 MPIOutputBuffer::~MPIOutputBuffer()
00017 {
00018 if (initialized)
00019 if (slicer.thereIsMixedDataType)
00020 mixedMPIDataType.Free();
00021 }
00022
00023 void MPIOutputBuffer::startNewMPIExchange()
00024 {
00025 slicer.reset();
00026 }
00027
00028 void MPIOutputBuffer::initialize(MPIMessageSpec msgSpec, size_t spikeBufferSize, size_t maxMPIMsgSize,
00029 void *baseBufferPtr, void *analogBuffer, void *spikeBuffer)
00030 {
00031 this->baseBufferPtr = baseBufferPtr;
00032 currentMsgInfo = msgSpec;
00033 analog_buf = analogBuffer;
00034 spike_buf = spikeBuffer;
00035
00036 slicer.initialize((char *)spikeBuffer - (char *)analogBuffer,
00037 maxMPIMsgSize, spikeBufferSize);
00038
00039
00040 size_t analog_buffer_size_elements = (double *)spikeBuffer - (double *)analogBuffer;
00041
00042 spike_buffer_size_elements = spikeBufferSize / sizeof(MPIOutputSpikeBuffer<>::coding_element_type);
00043
00044 mpiOutputSpikeBuffer.initialize((MPIOutputSpikeBuffer<>::coding_element_type *)spikeBuffer, spike_buffer_size_elements);
00045
00046 slicer.initialize(analog_buffer_size_elements,
00047 maxMPIMsgSize, spikeBufferSize);
00048
00049 analogMPIDatatype = MPI::DOUBLE;
00050 if (sizeof(MPIOutputSpikeBuffer<>::coding_element_type) == 4)
00051 spikingMPIDatatype = MPI::LONG;
00052 else
00053 spikingMPIDatatype = MPI::SHORT;
00054
00055
00056 if (slicer.thereIsMixedDataType) {
00057
00058 mixed_data_types[0] = analogMPIDatatype;
00059 mixed_data_types[1] = spikingMPIDatatype;
00060
00061 if (maxMPIMsgSize == 0)
00062 mixedCounts[0] = analog_buffer_size_elements;
00063 else
00064 mixedCounts[0] = analog_buffer_size_elements % (maxMPIMsgSize / sizeof(double));
00065 mixedCounts[1] = spikeBufferSize / sizeof(MPIOutputSpikeBuffer<>::coding_element_type);
00066
00067 mixedDisplacements[0] = 0;
00068 mixedDisplacements[1] = mixedCounts[0] * sizeof(double);
00069
00070
00071 mixedMPIDataType = MPI::Datatype::Create_struct(2, mixedCounts, mixedDisplacements, mixed_data_types);
00072 }
00073
00074 initialized = true;
00075 }
00076
00077 bool MPIOutputBuffer::hasNextBufferSlice()
00078 {
00079 if (slicer.currentSliceType == MPIBufferSlicer::sliceAnalog ||
00080 slicer.currentSliceType == MPIBufferSlicer::sliceUndefined ) {
00081 if (totalAnalogMsgCounter)
00082 return true;
00083 else {
00084 return mpiOutputSpikeBuffer.hasNextSerializedMPIBuffer();
00085 }
00086 }
00087 return mpiOutputSpikeBuffer.hasNextSerializedMPIBuffer();
00088 }
00089
00090 MPIMessageSpec & MPIOutputBuffer::prepareNextBufferSlice()
00091 {
00092 currentMsgInfo.hasContent = hasNextBufferSlice();
00093
00094 slicer.calcNextBufferSliceDimensions();
00095
00096
00097 switch (slicer.currentSliceType) {
00098 case MPIBufferSlicer::sliceAnalog :
00099 *currentMsgInfo.displacement = ((char *)analog_buf + slicer.currentSlicePos) - (char *)baseBufferPtr;
00100 *currentMsgInfo.buffer = (void *)((char *)analog_buf + slicer.currentSlicePos);
00101 *currentMsgInfo.count = slicer.currentAnalogSliceSize / sizeof(double);
00102 *currentMsgInfo.datatype = analogMPIDatatype;
00103 currentMsgInfo.content_type = MPIMessageSpec::contentAnalog;
00104 break;
00105 case MPIBufferSlicer::sliceMixed :
00106 mixedCounts[1] = mpiOutputSpikeBuffer.prepareNextSerializedMPIBuffer(slicer.allowedSpikeSliceSize/sizeof(MPIOutputSpikeBuffer<>::coding_element_type));
00107
00108 mixedMPIDataType.Free();
00109 mixedMPIDataType = MPI::Datatype::Create_struct(2, mixedCounts, mixedDisplacements, mixed_data_types);
00110 mixedMPIDataType.Commit();
00111 *currentMsgInfo.displacement = (char *)( (double *)spike_buf - mixedCounts[0] ) - (char *)baseBufferPtr;
00112 *currentMsgInfo.buffer = (void *)( (double *)spike_buf - mixedCounts[0] ) ;
00113 *currentMsgInfo.datatype = mixedMPIDataType;
00114 *currentMsgInfo.count = 1;
00115 currentMsgInfo.content_type = MPIMessageSpec::contentMixed;
00116 break;
00117 case MPIBufferSlicer::sliceSpiking :
00118 *currentMsgInfo.displacement = (char *)spike_buf - (char *)baseBufferPtr;
00119 *currentMsgInfo.buffer = spike_buf;
00120 *currentMsgInfo.count = mpiOutputSpikeBuffer.prepareNextSerializedMPIBuffer(slicer.allowedSpikeSliceSize/sizeof(MPIOutputSpikeBuffer<>::coding_element_type));;
00121 *currentMsgInfo.datatype = spikingMPIDatatype;
00122 currentMsgInfo.content_type = MPIMessageSpec::contentSpiking;
00123 break;
00124 case MPIBufferSlicer::sliceUndefined :
00125 assert( 0 );
00126 }
00127 return currentMsgInfo;
00128 }
00129
00130 MPIMessageSpec& MPIOutputBuffer::getCurrentMPIMsgSpecHolder()
00131 {
00132 return currentMsgInfo;
00133 }
00134
00135 void *MPIOutputBuffer::getBuffer()
00136 {
00137 return (void *)analog_buf;
00138 }
00139
00140 void MPIOutputBuffer::setFinishedFlag(bool finished)
00141 {
00142 mpiOutputSpikeBuffer.setFinishedFlag(finished);
00143 }
00144
00145 void MPIOutputBuffer::nextCycle()
00146 {
00147 mpiOutputSpikeBuffer.nextCycle();
00148 }
00149
00150 MPIOutputSpikeBuffer<> & MPIOutputBuffer::mpiOutputSpikingBuffer()
00151 {
00152 return mpiOutputSpikeBuffer;
00153 }
00154
00155
00156 MPIOutputBufferVector::MPIOutputBufferVector(int numBuffers, vector<gl_engineid_t> &engIDs)
00157 : initialized(false), nNodes(numBuffers), mpiExchBlocksInfo(numBuffers)
00158 {
00159 _buffers.resize(nNodes, MPIOutputBuffer(engIDs));
00160 }
00161
00162 MPIOutputBufferVector::~MPIOutputBufferVector()
00163 {
00164 if (initialized)
00165 delete [] memoryPool;
00166 }
00167
00168
00169 void MPIOutputBufferVector::initialize(int minDelay, size_t maxMPIMessageSize,
00170 size_t spikeBufferSize)
00171 {
00172
00173 spike_buffer_size = spikeBufferSize;
00174 int pool_size = 0;
00175 for (int i = 0; i < nNodes; ++i) {
00176 _buffers[i].calculateTotalAnalogMsgCounter();
00177
00178 pool_size += _buffers[i].totalAnalogMsgCounter * sizeof(double) * minDelay + spikeBufferSize;
00179 }
00180
00181 memoryPool = new char[pool_size];
00182
00183 memset(memoryPool, 0, pool_size);
00184
00185
00186 void * analogBufPtr = (void *)memoryPool;
00187 for (int i = 0; i < nNodes; ++i) {
00188 void *spikeBufPtr = (void *)((char *)analogBufPtr + _buffers[i].totalAnalogMsgCounter * sizeof(double) * minDelay);
00189 _buffers[i].initialize(mpiExchBlocksInfo.getMsgSpec(i), spike_buffer_size, maxMPIMessageSize,
00190 memoryPool, analogBufPtr, spikeBufPtr);
00191 analogBufPtr = (char *)analogBufPtr + _buffers[i].totalAnalogMsgCounter * sizeof(double) * minDelay + spike_buffer_size;
00192 }
00193
00194 nextCycle();
00195
00196 initialized = true;
00197 }
00198
00199 void MPIOutputBufferVector::startNewMPIExchange()
00200 {
00201 vector<MPIOutputBuffer>::iterator buf_it;
00202 for (buf_it = _buffers.begin() ; buf_it != _buffers.end() ; ++buf_it)
00203 buf_it->startNewMPIExchange();
00204
00205 }
00206
00207 void MPIOutputBufferVector::prepareNextBufferSlices()
00208 {
00209 vector<MPIOutputBuffer>::iterator buf_it;
00210 for (buf_it = _buffers.begin() ; buf_it != _buffers.end() ; ++buf_it)
00211 buf_it->prepareNextBufferSlice();
00212 }
00213
00214
00215 MPIExchangeBlocksInfo & MPIOutputBufferVector::getMPIExchangeBlocksInfo()
00216 {
00217 return mpiExchBlocksInfo;
00218 }