00001 #ifndef MPIOutputSpikeBuffer_H_
00002 #define MPIOutputSpikeBuffer_H_
00003
00004 #include "globaldefinitions.h"
00005 #include <vector>
00006 #include <algorithm>
00007 #include <cstddef>
00008
00009 using std::vector;
00010 using std::copy;
00011
00012 #include <iostream>
00013 using std::cout;
00014 using std::endl;
00015
00016
00017 template<typename T>
00018 class MPIOutputSpikeBufferVector;
00019
00021
00030 template<typename T = unsigned short>
00031 class MPIOutputSpikeBuffer
00032 {
00033 public:
00035
00042 MPIOutputSpikeBuffer(vector<gl_engineid_t> &engIDs);
00043
00044
00046
00049 typedef T coding_element_type ;
00050
00052 T guard_value;
00053
00054 virtual ~MPIOutputSpikeBuffer();
00055
00056 void initialize(void * spikeBuffer, size_t spikeBufferSize);
00057
00059 void appendSpike(local_objectid_t oid, int timestamp, engineid_t eng = 0);
00060
00062 void nextCycle();
00063
00065
00068 int prepareNextSerializedMPIBuffer(size_t currentBufferSize);
00069
00070 int prepareNextSerializedMPIBuffer()
00071 {
00072 return prepareNextSerializedMPIBuffer(buf_size);
00073 };
00074
00076 bool hasNextSerializedMPIBuffer();
00077
00079 const T* const getBuffer();
00080
00082 void setFinishedFlag(bool flag);
00083
00084 protected:
00085
00086
00088 int nLocalEngines;
00089
00091 size_t buf_size;
00092
00094
00098 vector<vector<T> > _buffers;
00099
00101 vector<gl_engineid_t> *gl_engineids;
00102
00104 T* serialBuffer;
00105
00107
00112 vector<int> timestamps;
00113
00115 bool isInit;
00116
00118 void initSerialization();
00119
00121 gl_engineid_t s_curr_eng;
00122
00124 int s_curr_timestamp;
00125
00126 typedef typename vector<T>::const_iterator vector_iterator;
00127
00129 vector_iterator s_curr_src_pos;
00130
00132 vector_iterator s_src_end_pos;
00133
00135 bool serializationFinished;
00136
00137 friend class MPIOutputSpikeBufferVector<T>;
00138
00139 };
00140
00142
00150 template<typename T = unsigned short>
00151 class MPIOutputSpikeBufferVector
00152 {
00153 public:
00154
00163 MPIOutputSpikeBufferVector(int numBuffers, vector<gl_engineid_t> &engIDs,
00164 size_t buffer_size = MPIBUFFER_BLOCK_SIZE);
00165
00166 ~MPIOutputSpikeBufferVector();
00167
00169 T *getBuffersPool()
00170 {
00171 return serialBuffersPool;
00172 }
00173
00175 void nextCycle()
00176 {
00177 for (int i = 0; i < nNodes; ++i) {
00178 _outputbuffers[i]->nextCycle();
00179 }
00180 }
00181
00183 void setFinishedFlag(bool flag)
00184 {
00185 for (int i = 0; i < nNodes; ++i) {
00186 _outputbuffers[i]->setFinishedFlag(flag);
00187 }
00188 }
00189
00191 MPIOutputSpikeBuffer<T> & operator[] (int idx)
00192 {
00193 return *_outputbuffers[idx];
00194 }
00195
00196 private:
00197
00199 int nNodes;
00200
00202 size_t buf_size;
00203
00205 vector<MPIOutputSpikeBuffer<T>*> _outputbuffers;
00206
00208
00211 T *serialBuffersPool;
00212 };
00213
00214 template<typename T>
00215 MPIOutputSpikeBuffer<T>::MPIOutputSpikeBuffer(vector<gl_engineid_t> &engIDs) :
00216 gl_engineids(&engIDs)
00217 {
00218 guard_value = 0xFF;
00219 for (unsigned int i = 1; i < sizeof(T); ++i)
00220 guard_value = (guard_value << 8) + 0xFF;
00221
00222 nLocalEngines = gl_engineids->size();
00223
00224 _buffers.resize(nLocalEngines);
00225 timestamps.resize(nLocalEngines, int(-1));
00226 for (int j = 0 ; j < nLocalEngines ; ++j) {
00227 _buffers[j].push_back((*gl_engineids)[j]);
00228 }
00229
00230 serializationFinished = false;
00231 }
00232
00233 template<typename T>
00234 void MPIOutputSpikeBuffer<T>::initialize(void * spikeBuffer,
00235 size_t spikeBufferSize)
00236 {
00237 serialBuffer = (T *)spikeBuffer;
00238 buf_size = spikeBufferSize;
00239 }
00240
00241 template<typename T>
00242 MPIOutputSpikeBuffer<T>::~MPIOutputSpikeBuffer()
00243 {}
00244
00245 template<typename T>
00246 void MPIOutputSpikeBuffer<T>::appendSpike(local_objectid_t oid,
00247 int timestamp, engineid_t eng)
00248 {
00249 if (timestamps[eng] < timestamp) {
00250 timestamps[eng] = timestamp;
00251 _buffers[eng].push_back(guard_value);
00252 _buffers[eng].push_back(timestamp);
00253 }
00254 _buffers[eng].push_back(oid);
00255 }
00256
00257 template<typename T>
00258 void MPIOutputSpikeBuffer<T>::nextCycle()
00259 {
00260 for (int i = 0 ; i < nLocalEngines ; ++i) {
00261 _buffers[i].resize(1);
00262 timestamps[i] = -1;
00263 }
00264 isInit = false;
00265 }
00266
00267 template<typename T>
00268 void MPIOutputSpikeBuffer<T>::initSerialization()
00269 {
00270
00271 serializationFinished = false;
00272 engineid_t first_non_empty = 0;
00273 for (int j = 0 ; j < nLocalEngines; ++j) {
00274 _buffers[j].push_back(guard_value);
00275 }
00276 while (first_non_empty < nLocalEngines && _buffers[first_non_empty].size() == 2)
00277 first_non_empty++;
00278 if (first_non_empty == nLocalEngines) {
00279 serializationFinished = true;
00280 s_curr_timestamp = -1;
00281 } else {
00282 s_curr_eng = first_non_empty;
00283 s_curr_timestamp = -1;
00284 s_curr_src_pos = _buffers[first_non_empty].begin();
00285 s_src_end_pos = _buffers[first_non_empty].end();
00286 }
00287 }
00288
00289 template<typename T>
00290 const T* const MPIOutputSpikeBuffer<T>::getBuffer()
00291 {
00292 return serialBuffer;
00293 }
00294
00295 template<typename T>
00296 void MPIOutputSpikeBuffer<T>::setFinishedFlag(bool flag)
00297 {
00298 *(serialBuffer+1) = flag;
00299 }
00300
00301
00302 template<typename T>
00303 int MPIOutputSpikeBuffer<T>::prepareNextSerializedMPIBuffer(size_t currentBufferSize)
00304 {
00305 if (!isInit) {
00306 initSerialization();
00307 isInit = true;
00308 }
00309
00310 T *dest_pos = serialBuffer;
00311 T *dest_end_pos = serialBuffer + currentBufferSize;
00312 bool withoutHeader = true;
00313
00314
00315
00316 dest_pos+=2;
00317
00318 if (serializationFinished) {
00319 int length = 2;
00320 *serialBuffer = length ;
00321 *(serialBuffer + 1) = (T)serializationFinished;
00322 return length;
00323 }
00324
00325
00326 if (s_curr_timestamp != -1) {
00327 *(dest_pos++) = (*gl_engineids)[s_curr_eng];
00328 *(dest_pos++) = guard_value;
00329 *(dest_pos++) = s_curr_timestamp;
00330 withoutHeader = false;
00331 }
00332
00333 while (!serializationFinished &&
00334 (s_src_end_pos - s_curr_src_pos) <= ((dest_end_pos - dest_pos)-1) ) {
00335 copy(s_curr_src_pos, s_src_end_pos, dest_pos);
00336 dest_pos += s_src_end_pos - s_curr_src_pos;
00337
00338 s_curr_eng++;
00339 while (s_curr_eng < nLocalEngines && _buffers[s_curr_eng].size() == 2)
00340 s_curr_eng++;
00341
00342
00343 if (s_curr_eng == nLocalEngines) {
00344 serializationFinished = true;
00345 } else {
00346 s_curr_timestamp = -1;
00347 s_curr_src_pos = _buffers[s_curr_eng].begin();
00348 s_src_end_pos = _buffers[s_curr_eng].end();
00349 }
00350 *(dest_pos++) = guard_value;
00351 withoutHeader = true;
00352 }
00353
00354 if (!serializationFinished) {
00355
00356
00357 typename vector<T>::const_iterator last_possible_pos = s_curr_src_pos + ((dest_end_pos - dest_pos) -2);
00358
00359 if (last_possible_pos - s_curr_src_pos >= 2) {
00360 if (*(last_possible_pos-2) == guard_value )
00361 last_possible_pos-=2;
00362 else
00363 if (*(last_possible_pos-1) == guard_value )
00364 last_possible_pos-- ;
00365 }
00366
00367 if (withoutHeader) {
00368
00369 if (last_possible_pos - s_curr_src_pos >= 4) {
00370 *(dest_pos++) = (*gl_engineids)[s_curr_eng];
00371 *(dest_pos++) = guard_value;
00372 *(dest_pos++) = *(s_curr_src_pos + 2);
00373 s_curr_src_pos += 3;
00374 }
00375 }
00376
00377 if (last_possible_pos - s_curr_src_pos > 0) {
00378 copy(s_curr_src_pos, last_possible_pos, dest_pos);
00379 dest_pos += last_possible_pos - s_curr_src_pos;
00380 s_curr_src_pos = last_possible_pos;
00381 if (*s_curr_src_pos == guard_value)
00382 s_curr_src_pos+=2;
00383 typename vector<T>::const_iterator timestamp_pos = last_possible_pos;
00384 while (*timestamp_pos != guard_value)
00385 timestamp_pos--;
00386 s_curr_timestamp = *(timestamp_pos+1);
00387 *(dest_pos++) = guard_value;
00388 *(dest_pos++) = guard_value;
00389 }
00390 }
00391
00392
00393 int length = dest_pos - serialBuffer;
00394 *serialBuffer = length ;
00395 *(serialBuffer+1) = (T)serializationFinished;
00396
00397 return length;
00398 }
00399
00400 template<typename T>
00401 bool MPIOutputSpikeBuffer<T>::hasNextSerializedMPIBuffer()
00402 {
00403 return !serializationFinished;
00404 }
00405
00406
00407 template<typename T>
00408 MPIOutputSpikeBufferVector<T>::MPIOutputSpikeBufferVector(int numBuffers,
00409 vector<gl_engineid_t> &engIDs,
00410 size_t buffer_size) :
00411 nNodes(numBuffers), buf_size(buffer_size)
00412 {
00413
00414 serialBuffersPool = new T[nNodes*buf_size];
00415
00416 _outputbuffers.resize(nNodes);
00417
00418 for (int i = 0; i < nNodes; ++i) {
00419 _outputbuffers[i] = new MPIOutputSpikeBuffer<T>(engIDs);
00420 _outputbuffers[i]->initialize(serialBuffersPool + i*buf_size, buf_size);
00421 }
00422
00423 nextCycle();
00424 }
00425
00426
00427 template<typename T>
00428 MPIOutputSpikeBufferVector<T>::~MPIOutputSpikeBufferVector()
00429 {
00430 for (int i = 0; i < nNodes; ++i) {
00431 delete _outputbuffers[i];
00432 }
00433
00434 delete [] serialBuffersPool;
00435 }
00436
00437
00438 #endif