SparseLibrary  Version 1.6.0
RDBHilbert.hpp
1 /*
2  * Copyright (c) 2007-2014, A. N. Yzelman, Utrecht University 2007-2011;
3  * KU Leuven 2011-2014.
4  * R. H. Bisseling, Utrecht University 2007-2014.
5  *
6  * This file is part of the Sparse Library.
7  *
8  * This library was developed under supervision of Prof. dr. Rob H. Bisseling at
9  * Utrecht University, from 2007 until 2011. From 2011-2014, development continued
10  * at KU Leuven, where Prof. dr. Dirk Roose contributed significantly to the ideas
11  * behind the newer parts of the library code.
12  *
13  * The Sparse Library is free software: you can redistribute it and/or modify
14  * it under the terms of the GNU General Public License as published by the
15  * Free Software Foundation, either version 3 of the License, or (at your
16  * option) any later version.
17  *
18  * The Sparse Library is distributed in the hope that it will be useful, but
19  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
20  * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21  * for more details.
22  *
23  * You should have received a copy of the GNU General Public License along
24  * with the Sparse Library. If not, see <http://www.gnu.org/licenses/>.
25  */
26 
27 
28 /*
29  * File created by:
30  * A. N. Yzelman, Dept. of Computer Science, KU Leuven, 2011.
31  */
32 
33 
34 #include <iostream>
35 #include <vector>
36 #include <map>
37 #include <pthread.h>
38 
39 #ifndef _NO_LIBNUMA
40  #include <numa.h>
41 #endif
42 
43 //compiling with sniper will instrument a call to a repeated SpMV (zax)
44 #ifdef _WITH_SNIPER
45  #include "sim_api.h"
46 #endif
47 
48 //#include "hwloc.h"
49 
50 #include "SparseMatrix.hpp"
51 #include "Matrix2HilbertCoordinates.hpp"
52 #include "FBICRS.hpp"
53 #include "MachineInfo.hpp"
54 #include "BigInt.hpp"
55 
56 #ifndef _H_RDBHILBERT
57 #define _H_RDBHILBERT
58 
59 /*
60  * When defined, thread 0 will use the global y vector for its local
61  * computations. This introduces extra work in the form of one sync,
62  * and the amount of available processors for the parallel collect.
63  * The advantage is less data replication of the output vector.
64  *
65  * The synchronisation cannot be prevented. Using all processors in
66  * the collect code is possible but has not been programmed currently.
67  */
68 #define RDBH_GLOBAL_Y
69 
70 #ifndef _TESTMODE
71  /*
72  * When defined, RDBHilbert will not collect output results in the
73  * global output vector passed to this library. Used for timing
74  * the true SpMV speeds only.
75  */
76  #define RDBH_NO_COLLECT
77 #endif
78 
80 template< typename T >
82 
83  public:
84 
86  size_t id;
87 
89  size_t P;
90 
95  unsigned char mode;
96 
98  size_t repeat;
99 
101  //size_t k;
102 
104  std::vector< Triplet< T > > *original;
105 
107  size_t *nzb;
108 
110  double time;
111 
113  size_t bytes;
114 
116  size_t fillIn;
117 
119  pthread_mutex_t* mutex;
120 
122  pthread_cond_t* cond;
123 
125  pthread_mutex_t* end_mutex;
126 
128  pthread_cond_t* end_cond;
129 
131  size_t *sync;
132 
134  size_t *end_sync;
135 
138 
141 
144 
146  const T ** input;
147 
149  T ** output;
150 
152  //const T *** inputX;
153 
155  //T *** outputZ;
156 
158  //T ** local_Y;
159 
161  RDB_shared_data(): id( -1 ), P( -1 ), mode( 0 ), repeat( 0 ),
162  //k( 0 ),
163  original( NULL ), nzb( NULL ), time( 0 ),
164  mutex( NULL ), cond( NULL ), end_mutex( NULL ), end_cond( NULL ),
165  sync( NULL ), end_sync( NULL ),
167  local_y( NULL ), input( NULL ), output( NULL )//,
168  //inputX( NULL ), outputZ( NULL ), local_Y( NULL )
169  {}
170 
172  RDB_shared_data( size_t _id, size_t _P,
173  std::vector< Triplet< double > > *_original,
174  size_t *_nzb,
175  pthread_mutex_t *_mutex, pthread_cond_t *_cond, pthread_mutex_t *_end_mutex, pthread_cond_t *_end_cond,
176  size_t *_sync, size_t *_end_sync,
177  size_t _ovsize, size_t _ovoffset,
178  const T ** const _in, T ** const _out//,
179  //const T*** const _inX, T*** const _outZ
180  ) :
181  id( _id ), P( _P ), mode( 1 ), repeat( 1 ), original( _original ), nzb( _nzb ), time( 0 ),
182  mutex( _mutex ), cond( _cond ), end_mutex( _end_mutex ), end_cond( _end_cond ),
183  sync( _sync ), end_sync( _end_sync ),
184  output_vector_size( _ovsize ), output_vector_offset( _ovoffset ),
185  local_y( NULL ), input( _in ), output( _out )//,
186  //inputX( _inX ), outputZ( _outZ ), local_Y( NULL )
187  {}
188 };
189 
191 int rdbh_uli_compare( const void *a, const void *b ) {
192  return ( *(unsigned long int*)a - *(unsigned long int*)b );
193 }
194 
200 template< typename T, class MatrixType = FBICRS< T > >
201 class RDBHilbert: public SparseMatrix< T, ULI > {
202 
203  private:
204 
205  protected:
206 
208  static size_t P;
209 
211  std::vector< size_t > p_translate;
212 
214  const T* input;
215 
217  T *output;
218 
220  //const T*__restrict__ const *__restrict__ inputX;
221 
223  //T*__restrict__ const *__restrict__ outputZ;
224 
226  pthread_t *threads;
227 
230 
232  static clockid_t global_clock_id;
233 
235  pthread_mutex_t mutex;
236 
238  pthread_cond_t cond;
239 
241  pthread_mutex_t end_mutex;
242 
244  pthread_cond_t end_cond;
245 
247  size_t sync;
248 
250  size_t end_sync;
251 
253  static const ULI max_n = FBICRS< T >::beta_n;
254 
256  static const ULI max_m = FBICRS< T >::beta_m;
257 
259  void set_p_translate( std::vector< size_t > *_p_translate ) {
260  if( _p_translate == NULL ) {
261  //get number of cores available
263  for( size_t i = 0; i < P; ++i ) {
264 #ifdef __MIC
265  const size_t num_threads_times_two = 120; //114 for the 57-core MIC
266  //offset controls wether we are dividing over the first
267  //2 HW threads or the latter two
268  size_t offset = 0;
269  //assume the number of cores is i div 2
270  size_t core = i / 2;
271  //if i >= 120 we go for the latter two HW thread on each core
272  if( i >= num_threads_times_two ) {
273  //map to the same cores as for i<120
274  core = (i-num_threads_times_two) / 2;
275  //but at the higher 2 HW threads
276  offset = 2;
277  }
278  //assume the thread number on-core is i mod 2 (plus offset)
279  const size_t hwthread = i % 2 + offset;
280  //assume consecutively wrapped HW threads, 4 per core
281  p_translate.push_back( 4 * core + hwthread );
282 #else
283  //just use consecutive numbering otherwise
284  p_translate.push_back( i );
285 #endif
286  }
287  } else {
288  p_translate = *_p_translate;
289  P = p_translate.size();
290  }
291  }
292 
293  public:
294 
296  RDBHilbert( const std::string file, T zero = 0, std::vector< size_t > *_p_translate = NULL ) {
297  set_p_translate( _p_translate );
298  this->loadFromFile( file, zero );
299  }
300 
302  RDBHilbert( std::vector< Triplet< T > >& input, ULI m, ULI n, T zero = 0, std::vector< size_t > *_p_translate = NULL ) {
303  set_p_translate( _p_translate );
304  load( input, m, n, zero );
305  }
306 
308  virtual ~RDBHilbert() {
309  //set all daemon threads to exit mode
310  for( size_t i = 0; i < P; i++ ) {
311  thread_data[ i ].mode = 4;
312  }
313 
314  //wake up all daemon threads
315  pthread_mutex_lock( &mutex );
316  pthread_cond_broadcast( &cond );
317  pthread_mutex_unlock( &mutex );
318 
319  //allow threads to exit gracefully
320  for( size_t i = 0; i < P; i++ ) {
321  pthread_join( threads[ i ], NULL );
322  }
323 
324  //destroy data
325  delete [] thread_data;
326  delete [] threads;
327  pthread_mutex_destroy( &mutex );
328  pthread_cond_destroy( &cond );
329  }
330 
332  void wait() {
333  //wait for end signal
334  pthread_cond_wait( &end_cond, &end_mutex );
335  pthread_mutex_unlock( &end_mutex );
336  }
337 
339  virtual void load( std::vector< Triplet< T > >& input, const ULI m, const ULI n, const T zero ) {
340 #ifndef _NO_LIBNUMA
341  //set kernel to local thread allocation if it wasn't already the case
342  numa_set_localalloc();
343 #endif
344 
345  //base settings
346  this->zero_element = zero;
347  this->nor = m;
348  this->noc = n;
349  this->nnz = input.size();
350 
351  // Parallel distribution according to the Hilbert curve:
352  // -get maximum Hilbert coordinate
353  // (Note: curve coordinates should be implented such that (0,0) maps to 0!
354  // Otherwise a minimum Hilbert coordinate should also be found, and
355  // below code must be adapted to work on a Hilbert coordinate range
356  // instead)
357  // -simultaneously also get number of nonzeroes to locally store
358  // -translate to a range
359  // -call function to distribute this range over P threads
360  //
361  // Most operations are done in parallel (see thread function), basically
362  // only shared array allocation and the actual forking is in this function.
363 
364  size_t *nzb = new size_t [ this->m() ];
365  for( ULI i = 0; i < m; i++ ) nzb[ i ] = 0;
366 
367  //create P threads :)
368  this->threads = new pthread_t[ P ];
369  //initialize local initialisation data
371  //initialize mutexes and conditions and a synchronisation counter
372  pthread_mutex_init( &mutex, NULL );
373  pthread_cond_init ( &cond, NULL );
374  pthread_mutex_init( &end_mutex, NULL );
375  pthread_cond_init ( &end_cond, NULL );
376  sync = 0;
377  end_sync = 0;
378  //lock end mutex (disallow threads that are created to signal for end
379  //before this thread is done with spawning children)
380  pthread_mutex_lock( &end_mutex );
381  //go forth and multiply
382  for( size_t i = 0; i < P; i++ ) {
383  //build thread-local init data
385  i, P, &input, nzb, &mutex, &cond, &end_mutex, &end_cond,
386  &sync, &end_sync, -1, -1, &(this->input), &output
387  //, &intputX, outputZ
388  );
389  //set fixed affinity for threads
390  cpu_set_t mask;
391  CPU_ZERO( &mask );
392  CPU_SET ( p_translate[ i ], &mask );
393 
394  //TODO: use hwloc for better numa-aware pinning
395  /*hwloc_topology_t topology;
396  hwloc_topology_init ( &topology );
397  hwloc_topology_load( topology );
398  hwloc_bitmap_t cpuset;*/
399 
400  //prepare attributes
401  pthread_attr_t attr;
402  pthread_attr_init( &attr );
403  //set fixed affinity in attribute, so that it starts binded immediately
404  pthread_attr_setaffinity_np( &attr, sizeof( cpu_set_t ), &mask );
405  //fire up thread
406  pthread_create( &threads[i], &attr, &RDBHilbert::thread, (void*) &thread_data[i] );
407  //free attr
408  pthread_attr_destroy( &attr );
409  }
410 
411  //wait for threads to finish initialisation
412  wait();
413 
414  //report on fillIn
415  size_t fillIn = 0;
416  for( size_t i = 0; i < P; i++ ) {
417  fillIn += thread_data[ i ].fillIn;
418  }
419  std::cout << "RDBHilbert: fill-in is " << fillIn << ", which is +" << 100.0*((double)fillIn)/((double)input.size()) << "%" << std::endl;
420 
421  //delete temporary array
422  delete [] nzb;
423  }
424 
426  static void end( pthread_mutex_t* mutex, pthread_cond_t* cond, size_t *sync, const size_t P ) {
427  pthread_mutex_lock( mutex );
428  (*sync)++;
429  if( *sync == P ) {
430  pthread_cond_signal( cond );
431  *sync = 0;
432  }
433  pthread_mutex_unlock( mutex );
434  }
435 
437  static void synchronise( pthread_mutex_t* mutex, pthread_cond_t* cond, size_t *sync, const size_t P ) {
438  pthread_mutex_lock( mutex );
439  (*sync)++;
440  if( *sync == P ) {
441  *sync = 0;
442  pthread_cond_broadcast( cond );
443  } else {
444  pthread_cond_wait( cond, mutex );
445  }
446  pthread_mutex_unlock( mutex );
447  }
448 
450  static void* thread( void *data ) {
451  //get short-hand notation
452  RDB_shared_data<T>* shared = (RDB_shared_data<T>*)data;
453  const size_t id = shared->id;
454  const size_t P = shared->P;
455  const size_t nnz = shared->original->size();
456  pthread_mutex_t *mutex = shared->mutex;
457  pthread_cond_t *cond = shared->cond;
458 
459  //check if pinned to exactly one thread
460  cpu_set_t mask;
461  CPU_ZERO( &mask );
462  pthread_getaffinity_np( pthread_self(), sizeof( cpu_set_t ), &mask );
463  unsigned long int MyMask = ULONG_MAX;
464  for( size_t s = 0; s < P; s++ ) {
465  if( CPU_ISSET( s, &mask ) ) {
466  if( MyMask == ULONG_MAX ) {
467  MyMask = s;
468  } else {
469  std::cerr << "Thread " << id << " mask is larger than one core" << " (" << MyMask << " and " << s << " are both set)!" << std::endl;
470  exit( 1 );
471  }
472  }
473  }
474 
475  //prepare to get global matrix dimensions
476  ULI m, n;
477  m = n = 0;
478  //put rowsums in nzb
479  const size_t blocksize = (nnz % P) > 0 ? nnz / P + 1 : nnz / P;
480  for( size_t i = 0; i < nnz; i++ ) {
481  const ULI currow = (*(shared->original))[ i ].i();
482  const ULI curcol = (*(shared->original))[ i ].j();
483  if( currow >= id * blocksize && currow < (id + 1) * blocksize ) {
484  shared->nzb[ currow ]++;
485  }
486  if( currow > m ) m = currow;
487  if( curcol > n ) n = curcol;
488  }
489 
490  //dimensions are one higher than max indices
491  ++m;
492  ++n;
493 
494  //sync
495  RDBHilbert::synchronise( mutex, cond, shared->sync, shared->P );
496 
497  //determine distribution
498  const size_t nnz_target = nnz / P;
499  size_t cursum = 0;
500 
501  //first sanity check
502  for( ULI i=0; i<m; i++ ) cursum += shared->nzb[ i ];
503  assert( cursum == nnz );
504 
505  //continue
506  cursum = 0;
507  unsigned long int start, end, k = 0;
508  start = end = ULONG_MAX;
509  if( id == 0 ) start = 0;
510 #ifndef NDEBUG
511  if( id == 0 ) std::cout << "Breakpoints: ";
512 #endif
513  for( ULI i = 0; i < m; i++ ) {
514  cursum += shared->nzb[ i ];
515  if( cursum >= nnz_target ) {
516 #ifndef NDEBUG
517  if( id == 0 ) std::cout << (i+1) << " ";
518 #endif
519  if( k == id ) end = i + 1;
520  if(k+1== id ) start = i + 1;
521  k++;
522  cursum = 0;
523  }
524  }
525  if( start == ULONG_MAX ) start = m;
526  if( id == P-1 || end == ULONG_MAX ) end = m;
527 #ifndef NDEBUG
528  if( id == 0 ) std::cout << std::endl;
529  RDBHilbert::synchronise( mutex, cond, shared->sync, shared->P );
530  std::cout << "Thread " << id << " has rows " << start << " to " << end << std::endl;
531 #endif
532  shared->output_vector_size = end - start;
533  shared->output_vector_offset = start;
534  assert (shared->output_vector_size <= m );
535  assert( shared->output_vector_offset + shared->output_vector_size <= m );
536 
537  //copy to local first
538  std::vector< Triplet< T > > local;
539  for( size_t i=0; i<nnz; i++ ) {
540  const ULI currow = (*(shared->original))[ i ].i();
541  if( currow >= start && currow < end )
542  local.push_back(
543  Triplet< T >( (*(shared->original))[ i ].i() - start,
544  (*(shared->original))[ i ].j(),
545  (*(shared->original))[ i ].value )
546  );
547  }
548  const size_t local_nnz = local.size();
549  m = shared->output_vector_size; //new matrix size is new m times old n
550 
551  //extract hilbert coordinate info
552  size_t h1, h2;
553  BigInt maxh( 0, 0 ), minh( ULONG_MAX, ULONG_MAX );
554  BigInt *t2h = new BigInt[ local_nnz ];
555  for( size_t i=0; i<local_nnz; i++ ) {
556  Matrix2HilbertCoordinates::IntegerToHilbert( local[i].i() / max_m, local[i].j() / max_n, h1, h2 );
557  const BigInt cur( h1, h2 );
558  t2h[ i ] = cur;
559  if( maxh < cur ) maxh = cur;
560  if( minh > cur ) minh = cur;
561  assert( t2h[ i ].high == h1 );
562  assert( t2h[ i ].low == h2 );
563  }
564 
565  //sequential quicksort O(nz*log(nz))
566  qsort( t2h, local_nnz, sizeof( BigInt ), bigint_compare );
567  assert( local_nnz == 0 || t2h[ local_nnz - 1 ] == maxh );
568  assert( local_nnz == 0 || t2h[ 0 ] == minh );
569 
570  //build local data structure from the triplets in shared->original
571  //and beta block hilbert index to beta translation map
572  std::map< BigInt, unsigned long int > h2b;
573  std::vector< std::vector< Triplet< T > > > beta;
574 
575  //guard against invalid accesses due to overpartitioning
576  if( local_nnz > 0 ) {
577  //prepare beta and map
578  //get first hilbert block coordinate
579  BigInt cur = t2h[ 0 ];
580  //remember index
581  h2b[ cur ] = 0;
582  //keep next index
583  unsigned long int c = 0, size = 1;
584  //remember previous index
585  BigInt prev_h = cur;
586  //do the same for the remainder of available hilbert coordinates
587  for( unsigned long int i = 1; i < local_nnz; ++i, ++size ) {
588  cur = t2h[ i ];
589  if( cur != prev_h ) { //new coordinate
590  //push back old vector with exact size
591  beta.push_back( std::vector< Triplet< T > >() );
592  beta.back().reserve( size );
593  //store previous index
594  h2b[ prev_h ] = c++;
595  //reset
596  prev_h = cur;
597  size = 1;
598  }
599  }
600  //push back last one
601  beta.push_back( std::vector< Triplet< T > >() );
602  beta.back().reserve( size );
603  h2b[ cur ] = c;
604  }
605 
606  //prepare to get matrix size (m, n) and some statistics too
607  unsigned long int smin_m = ULONG_MAX;
608  unsigned long int smin_n = ULONG_MAX;
609  unsigned long int smax_m, smax_n;
610  smax_m = smax_n = 0;
611  unsigned long int *ms = new unsigned long int[ h2b.size() ];
612  unsigned long int *ns = new unsigned long int[ h2b.size() ];
613  unsigned long int *minms = new unsigned long int[ h2b.size() ];
614  unsigned long int *minns = new unsigned long int[ h2b.size() ];
615  for( size_t i = 0; i < h2b.size(); i++ ) {
616  ms[ i ] = 0;
617  ns[ i ] = 0;
618  minms[ i ] = ULONG_MAX;
619  minns[ i ] = ULONG_MAX;
620  }
621 
622  //fill beta blocks in correct hilbert order O(local_nnz * log( nonzero_blocks ))
623  for( size_t i = 0; i < local_nnz; i++ ) {
624  const ULI row = local[ i ].i();
625  const ULI col = local[ i ].j();
626  Matrix2HilbertCoordinates::IntegerToHilbert( static_cast< size_t >(row / max_m), static_cast< size_t >(col / max_n), h1, h2 );
627  const BigInt cur( h1, h2 );
628  assert( cur >= minh );
629  assert( cur <= maxh );
630  assert( row < m );
631  const Triplet< T > toAdd = Triplet< T >( row, col, local[ i ].value );
632 #ifndef NDEBUG
633  if( beta[ h2b[ cur ] ].size() > 1 ) {
635  beta[ h2b[ cur ] ].back().i() / max_m,
636  beta[ h2b[ cur ] ].back().j() / max_n, h1, h2 );
637  assert( cur == BigInt( h1, h2 ) );
638  assert( beta[ h2b[ cur ] ].back().i() / max_m == row / max_m );
639  assert( beta[ h2b[ cur ] ].back().j() / max_n == col / max_n );
640  }
641 #endif
642  beta[ h2b[ cur ] ].push_back( toAdd );
643  if( row > ms[ h2b[ cur ] ] ) ms[ h2b[ cur ] ] = row;
644  if( col > ns[ h2b[ cur ] ] ) ns[ h2b[ cur ] ] = col;
645  if( row < minms[ h2b[ cur ] ] ) minms[ h2b[ cur ] ] = row;
646  if( col < minns[ h2b[ cur ] ] ) minns[ h2b[ cur ] ] = col;
647  if( row < smin_m ) smin_m = row;
648  if( col < smin_n ) smin_n = col;
649  if( row > smax_m ) smax_m = row;
650  if( col > smax_n ) smax_n = col;
651  }
652  //size is max value + 1
653  smax_m++; smax_n++;
654 
655 #ifndef NDEBUG
656  cursum = 0;
657  for( size_t i = 0; i < beta.size(); i++ ) {
658  cursum += beta[i].size();
659  }
660  assert( cursum == local_nnz );
661  std::cout << "Thread " << shared->id << ": " << smin_m << "," << smax_m << " times " << smin_n << "," << smax_n << " holding " << cursum << " nonzeroes." << std::endl;
662 
663  //sanity check: everything in one beta vector should share the
664  //same block.
665  for( size_t i = 0; i < beta.size(); ++i ) {
666  const ULI row_br = beta[i][0].i() / max_m;
667  const ULI col_br = beta[i][0].j() / max_n;
668  for( size_t k = 1; k < beta[i].size(); ++k ) {
669  assert( beta[i][k].i() / max_m == row_br );
670  assert( beta[i][k].j() / max_n == col_br );
671  }
672  }
673 #endif
674 
675  //remove temporary values
676  delete [] ms;
677  delete [] ns;
678  delete [] minms;
679  delete [] minns;
680  delete [] t2h;
681 
682  //load into FBICRS
683  MatrixType dss( beta, smax_m - smin_m, n );
684  beta.clear();
685 
686  //remember memory use
687  shared->bytes = dss.bytesUsed();
688  shared->fillIn = dss.fillIn;
689 
690  //create local shadow of y to avoid write-contention
691  T* y = NULL;
692 #ifdef RDBH_GLOBAL_Y
693  if( id > 0 ) {
694 #endif
695  y = new T[ shared->output_vector_size ];
696  for( size_t i = 0; i < shared->output_vector_size; i++ )
697  y[ i ] = 0.0;
698 #ifdef RDBH_GLOBAL_Y
699  }
700 #endif
701  shared->local_y = y;
702 
703  //create a cache of multiple output vectors in case of ZaX or ZXa (TODO)
704  //T ** Y = NULL;
705 
706  //exit construction mode
707  shared->mode = 0;
708 
709  //signal end of construction
710  pthread_mutex_lock( mutex );
711  RDBHilbert::end( shared->end_mutex, shared->end_cond, shared->end_sync, shared->P );
712 
713  //enter daemon mode
714  while( true ) {
715  struct timespec clk_start, clk_stop;
716  pthread_cond_wait( cond, mutex );
717  pthread_mutex_unlock( mutex );
718 
719  if( shared->mode == 4 ) break;
720 
721  switch( shared->mode ) {
722 /* case 7:
723  assert( *(shared->inputX) != NULL );
724  assert( *(shared->outputZ) != NULL );
725 #ifdef RDBH_GLOBAL_Y
726  if( id == 0 ) {
727  Y = *(shared->outputZ);
728  shared->local_Y = Y;
729  }
730 #endif
731  assert( Y != NULL );
732 
733  clock_gettime( global_clock_id, &clk_start);
734  shared->time = 0.0;
735  for( size_t i = 0; i < shared->repeat; i++ ) {
736  dss.ZXa( shared->k, shared->inputX, Y );
737  }
738  clock_gettime( global_clock_id, &clk_stop);
739  shared->time = (clk_stop.tv_sec-clk_start.tv_sec)*1000;
740  shared->time += (clk_stop.tv_nsec-clk_start.tv_nsec)/1000000.0;
741 #ifndef RDBH_NO_COLLECT
742  collectYs( shared );
743 #endif
744  break;
745  case 6:
746  assert( *(shared->inputX) != NULL );
747  assert( *(shared->outputZ) != NULL );
748 #ifdef RDBH_GLOBAL_Y
749  if( id == 0 ) {
750  Y = *(shared->outputZ);
751  shared->local_Y = Y;
752  }
753 #endif
754  assert( Y != NULL );
755 
756  clock_gettime( global_clock_id, &clk_start);
757  shared->time = 0.0;
758  for( size_t i = 0; i < shared->repeat; i++ ) {
759  dss.ZaX( shared->k, shared->inputX, Y );
760  }
761  clock_gettime( global_clock_id, &clk_stop);
762  shared->time = (clk_stop.tv_sec-clk_start.tv_sec)*1000;
763  shared->time += (clk_stop.tv_nsec-clk_start.tv_nsec)/1000000.0;
764 #ifndef RDBH_NO_COLLECT
765  collectYs( shared );
766 #endif
767  break;*/
768  case 5:
769 #ifdef RDBH_GLOBAL_Y
770  if( shared->id != 0) {
771 #endif
772  for( size_t i = 0; i < shared->output_vector_size; ++i ) {
773  shared->local_y[ i ] = 0;
774  }
775 #ifdef RDBH_GLOBAL_Y
776  }
777 #endif
778  break;
779  case 3:
780  assert( *(shared->input) != NULL );
781  assert( *(shared->output) != NULL );
782 #ifdef RDBH_GLOBAL_Y
783  if( id == 0 ) {
784  y = *(shared->output);
785  shared->local_y = y;
786  }
787 #endif
788  assert( y != NULL );
789 
790  clock_gettime( global_clock_id, &clk_start);
791  shared->time = 0.0;
792  for( size_t i = 0; i < shared->repeat; i++ ) {
793  dss.zxa( *(shared->input), y );
794  }
795  clock_gettime( global_clock_id, &clk_stop);
796  shared->time = (clk_stop.tv_sec-clk_start.tv_sec)*1000;
797  shared->time += (clk_stop.tv_nsec-clk_start.tv_nsec)/1000000.0;
798 
799 #ifndef RDBH_NO_COLLECT
800  collectY( shared );
801 #endif
802  break;
803  case 2:
804  assert( *(shared->input) != NULL );
805  assert( *(shared->output) != NULL );
806 #ifdef RDBH_GLOBAL_Y
807  if( id == 0 ) {
808  y = *(shared->output);
809  shared->local_y = y;
810  }
811 #endif
812  assert( y != NULL );
813 
814  clock_gettime( global_clock_id, &clk_start);
815  shared->time = 0.0;
816 #ifdef _WITH_SNIPER
817  parmacs_roi_begin();
818 #endif
819  for( size_t i = 0; i < shared->repeat; i++ ) {
820  dss.zax( *(shared->input), y );
821  }
822 #ifdef _WITH_SNIPER
823  parmacs_roi_end();
824 #endif
825  clock_gettime( global_clock_id, &clk_stop);
826  shared->time = (clk_stop.tv_sec-clk_start.tv_sec)*1000;
827  shared->time += (clk_stop.tv_nsec-clk_start.tv_nsec)/1000000.0;
828 
829 #ifndef RDBH_NO_COLLECT
830  collectY( shared );
831 #endif
832  break;
833  default:
834  std::cout << "Thread " << id << ": Error, undefined operation (" << shared->mode << ")!" << std::endl;
835  exit( -1 );
836  }
837  shared->mode = 0;
838 
839  //signal end of operation
840  pthread_mutex_lock( mutex );
841  RDBHilbert::end( shared->end_mutex, shared->end_cond, shared->sync, shared->P );
842  }
843 
844  //done
845 #ifdef RDBH_GLOBAL_Y
846  if( id != 0 )
847 #endif
848  delete [] y;
849  return (NULL);
850  }
851 
853  static void collectY( RDB_shared_data<T> *shared ) {
854 
855 #ifdef RDBH_GLOBAL_Y
856  //FIXME It could be possible to distribute work over all processors
857  //instead of p-1 processors, but this requires some extra balancing.
858  const size_t s = shared->id;
859  if( s == 0 ) return;
860 #endif
861 
862  //do collect items of own block
863  for( size_t i = 0; i < shared->output_vector_size; i++ ) {
864  assert( *(shared->output) != NULL );
865  assert( shared->local_y != NULL );
866  (*(shared->output))[ shared->output_vector_offset + i ] += shared->local_y[ i ];
867  }
868  }
869 
870  /*static void collectYs( RDB_shared_data<T> *shared ) {
871  //FIXME see above collectY
872 #ifdef RDBH_GLOBAL_Y
873  if( shared->id == 0 ) return;
874 #endif
875  //do collect items of own block
876  for( size_t i = 0; i < shared->output_vector_size; i++ ) {
877  assert( *(shared->outputZ) != NULL );
878  assert( shared->local_Y != NULL );
879  for( size_t s = 0; s < shared->k; ++ s ) {
880  (*(shared->outputZ))[s][ shared->output_vector_offset + i ] += shared->local_Y[ s ][ i ];
881  }
882  }
883  //done
884  }*/
885 
887  virtual T* mv( const T* x ) {
888  //over-allocate in case we use matrices that are too small to be
889  //vectorised internally (assuming we are using vecBICRS as sub_ds).
890  size_t allocsize = (this->nor + 1) * sizeof( T );
891  //allocate, either using libnuma (to interleave for i86)
892  //or using dynamic aligned allocs (for Xeon Phi MICs)
893 #ifndef _NO_LIBNUMA
894  T* ret = (T*) numa_alloc_interleaved( allocsize );
895 #else
896  T* ret = (T*) _mm_malloc( allocsize, 64 ); //instead of `new T[ nor ];'
897 #endif
898  //set to 0-vector
899  for( ULI i = 0; i < this->nor; i++ ) {
900  ret[ i ] = this->zero_element;
901  }
902 
903  //reset output vectors
904  reset();
905 
906  //do in-place SpMV
907  zax( x, ret );
908 
909  //return new output vector
910  return ret;
911  }
912 
914  virtual void zxa( const T* x, T* z ) {
915  zxa( x, z, 1 );
916  }
917 
919  virtual void zxa( const T* x, T* z, const size_t repeat ) {
920  //set all daemon threads to do zxa
921  for( size_t i=0; i<P; i++ ) {
922  thread_data[ i ].mode = 3;
923  thread_data[ i ].repeat = repeat;
924  }
925 
926  //set input vector
927  input = x;
928 
929  //set output vector
930  output = z;
931 
932  //wake up all daemon threads
933  pthread_mutex_lock( &end_mutex );
934  pthread_mutex_lock( &mutex );
935  pthread_cond_broadcast( &cond );
936  pthread_mutex_unlock( &mutex );
937 
938  //wait for end of operation
939  wait();
940 
941  //unset vectors
942  input = NULL;
943  output = NULL;
944  }
945 
946  /*virtual void ZXa( const size_t k, const T *__restrict__ const *__restrict__ const X, T *__restrict__ const *__restrict__ const Z ) {
947  ZXa( k, X, Z, 1 );
948  }
949 
950  virtual void ZXa( const size_t k, const T *__restrict__ const *__restrict__ const X, T *__restrict__ const *__restrict__ const Z, const size_t repeat ) {
951  //set all daemon threads to do ZXa
952  for( size_t i = 0; i < P; i++ ) {
953  thread_data[ i ].k = k;
954  thread_data[ i ].mode = 7;
955  thread_data[ i ].repeat = repeat;
956  }
957 
958  //set input/output vectors
959  inputX = X;
960  outputZ = Z;
961 
962  //wake up all daemon threads
963  pthread_mutex_lock( &end_mutex );
964  pthread_mutex_lock( &mutex );
965  pthread_cond_broadcast( &cond );
966  pthread_mutex_unlock( &mutex );
967  //wait for end of operation
968  wait();
969 
970  //unset vectors
971  inputX = NULL;
972  outputZ = NULL;
973  }*/
974 
976  void reset() {
977  //reset all local output vectors
978  for( size_t i=0; i<P; i++ ) {
979  thread_data[ i ].mode = 5;
980  }
981 
982  //wake up all daemon threads
983  pthread_mutex_lock( &end_mutex );
984  pthread_mutex_lock( &mutex );
985  pthread_cond_broadcast( &cond );
986  pthread_mutex_unlock( &mutex );
987 
988  //wait for end of operation
989  wait();
990  }
991 
993  virtual void zax( const T* x, T* z ) {
994  zax( x, z, 1, 0, NULL );
995  }
996 
998  virtual void zax( const T* x, T* z, const size_t repeat, const clockid_t clock_id, double *elapsed_time ) {
999 
1000  //set all daemon threads to do zax
1001  for( size_t i = 0; i < P; i++ ) {
1002  thread_data[ i ].mode = 2;
1003  thread_data[ i ].repeat = repeat;
1004  }
1005 
1006  //set global clock ID
1007  global_clock_id = clock_id;
1008 
1009  //set input vector
1010  input = x;
1011 
1012  //set output vector
1013  output = z;
1014 
1015  //wake up all daemon threads
1016  pthread_mutex_lock( &end_mutex );
1017  pthread_mutex_lock( &mutex );
1018  pthread_cond_broadcast( &cond );
1019  pthread_mutex_unlock( &mutex );
1020 
1021  //wait for end of operation
1022  wait();
1023 
1024  //get elapsed time
1025  double maxtime = 0.0;
1026  for( size_t i = 0; i < P; i++ ) {
1027  const double curtime = thread_data[ i ].time;
1028  if( curtime > maxtime ) maxtime = curtime;
1029  }
1030  if( elapsed_time != NULL )
1031  *elapsed_time += maxtime;
1032 
1033  //unset vectors
1034  input = NULL;
1035  output = NULL;
1036  }
1037 
1038  /*virtual void ZaX( const size_t k, const T*__restrict__ const *__restrict__ const X, T *__restrict__ const *__restrict__ const Z ) {
1039  ZaX( k, X, Z, 1, 0, NULL );
1040  }
1041 
1042  virtual void ZaX( const size_t k, const T*__restrict__ const *__restrict__ const X, T *__restrict__ const *__restrict__ const Z,
1043  const size_t repeat, const clockid_t clock_id, double *elapsed_time )
1044  {
1045  //set all daemon threads to do zax
1046  for( size_t i = 0; i < P; i++ ) {
1047  thread_data[ i ].k = k;
1048  thread_data[ i ].mode = 6;
1049  thread_data[ i ].repeat = repeat;
1050  }
1051 
1052  //set global clock ID
1053  global_clock_id = clock_id;
1054 
1055  //set input/output vectors
1056  inputX = X;
1057  outputZ = Z;
1058 
1059  //wake up all daemon threads
1060  pthread_mutex_lock( &end_mutex );
1061  pthread_mutex_lock( &mutex );
1062  pthread_cond_broadcast( &cond );
1063  pthread_mutex_unlock( &mutex );
1064  //wait for end of operation
1065  wait();
1066 
1067  //get elapsed time
1068  double maxtime = 0.0;
1069  for( size_t i = 0; i < P; i++ ) {
1070  const double curtime = thread_data[ i ].time;
1071  if( curtime > maxtime ) maxtime = curtime;
1072  }
1073  if( elapsed_time != NULL )
1074  *elapsed_time += maxtime;
1075 
1076  //unset vectors
1077  inputX = NULL;
1078  outputZ = NULL;
1079  }*/
1080 
1082  virtual void getFirstIndexPair( ULI &i, ULI &j ) {
1083  std::cerr << "Warning: RDBHilbert::getFirstIndexPair has no unique answer since it implements a parallel multiplication!\nIgnoring call..." << std::endl;
1084  }
1085 
1087  virtual size_t bytesUsed() {
1088  size_t ret = 0;
1089  for( size_t s = 0; s < P; ++s )
1090  ret += thread_data[ s ].bytes;
1091  return ret;
1092  }
1093 
1094 };
1095 
1096 template< typename T, class MatrixType > size_t RDBHilbert< T, MatrixType >::P = 0;
1097 
1098 template< typename T, class MatrixType > clockid_t RDBHilbert< T, MatrixType >::global_clock_id = 0;
1099 
1100 #endif
1101 
Hierarchical BICRS with fixed subblock size and distribution.
Definition: FBICRS.hpp:75
ULI nnz
Number of non-zeros.
Definition: SparseMatrix.hpp:58
virtual void zxa(const T *x, T *z)
Definition: RDBHilbert.hpp:914
virtual T * mv(const T *x)
Overloaded mv call; allocates output vector using numa_interleaved.
Definition: RDBHilbert.hpp:887
pthread_mutex_t end_mutex
Wait for end mechanism: mutex.
Definition: RDBHilbert.hpp:241
pthread_cond_t * cond
Pointer to the sync condition.
Definition: RDBHilbert.hpp:122
size_t output_vector_offset
Output vector offset (w.r.t.
Definition: RDBHilbert.hpp:140
double time
Will store local timing.
Definition: RDBHilbert.hpp:110
static const ULI max_m
Given FBICRS, the maximum value for the rowwise matrix size, assuming short ints on ICRS at the lower...
Definition: RDBHilbert.hpp:256
virtual void zxa(const T *x, T *z, const size_t repeat)
Definition: RDBHilbert.hpp:919
size_t * end_sync
End counter.
Definition: RDBHilbert.hpp:134
size_t * nzb
Will store rowsums.
Definition: RDBHilbert.hpp:107
T * local_y
Pointer to the local output vector.
Definition: RDBHilbert.hpp:143
pthread_cond_t cond
Stop/continue mechanism: condition.
Definition: RDBHilbert.hpp:238
const T ** input
Pointer to a global input vector.
Definition: RDBHilbert.hpp:146
size_t bytes
Will store the local amount of bytes used.
Definition: RDBHilbert.hpp:113
unsigned long int cores() const
The number of available cores.
Definition: MachineInfo.cpp:77
static void synchronise(pthread_mutex_t *mutex, pthread_cond_t *cond, size_t *sync, const size_t P)
Definition: RDBHilbert.hpp:437
A 128-bit integer, with overloaded comparison operators.
Definition: BigInt.hpp:38
virtual unsigned long int m()
Queries the number of rows this matrix contains.
Definition: SparseMatrix.hpp:107
size_t P
Number of SPMD processes.
Definition: RDBHilbert.hpp:89
T * output
Output vector.
Definition: RDBHilbert.hpp:217
virtual void load(std::vector< Triplet< T > > &input, const ULI m, const ULI n, const T zero)
Definition: RDBHilbert.hpp:339
static const MachineInfo & getInstance()
Gets a singleton instance.
Definition: MachineInfo.cpp:38
virtual void getFirstIndexPair(ULI &i, ULI &j)
Definition: RDBHilbert.hpp:1082
unsigned char mode
0 undef, 1 init, 2 zax, 3 zxa, 4 exit, 5 reset, 6 ZaX, 7 ZXa.
Definition: RDBHilbert.hpp:95
static void IntegerToHilbert(const size_t i, const size_t j, size_t &h1, size_t &h2)
New method, October 2010.
Definition: Matrix2HilbertCoordinates.cpp:48
RDBHilbert(const std::string file, T zero=0, std::vector< size_t > *_p_translate=NULL)
Default file-based constructor.
Definition: RDBHilbert.hpp:296
The Beta Hilbert triplet scheme.
Definition: RDBHilbert.hpp:201
size_t output_vector_size
Output vector length.
Definition: RDBHilbert.hpp:137
pthread_cond_t * end_cond
Pointer to the end condition.
Definition: RDBHilbert.hpp:128
void loadFromFile(const std::string file, const T zero=0)
Function which loads a matrix from a matrix market file.
Definition: SparseMatrix.hpp:89
Interface common to all sparse matrix storage schemes.
Definition: SparseMatrix.hpp:46
size_t end_sync
Used for construction end signal.
Definition: RDBHilbert.hpp:250
size_t repeat
How many times to repeat the operation set in `mode' (above, only for 2, 3, 6 and 7) ...
Definition: RDBHilbert.hpp:98
T ** output
Pointer to a global output vector.
Definition: RDBHilbert.hpp:149
pthread_mutex_t * end_mutex
Pointer to the end mutex.
Definition: RDBHilbert.hpp:125
ULI noc
Number of columns.
Definition: SparseMatrix.hpp:55
virtual ~RDBHilbert()
Base deconstructor.
Definition: RDBHilbert.hpp:308
std::vector< size_t > p_translate
Which processors to pin threads to.
Definition: RDBHilbert.hpp:211
RDB_shared_data()
In case of ZXa or ZaX, a pointer to a collection of global input vectors.
Definition: RDBHilbert.hpp:161
static size_t P
Number of threads to fire up.
Definition: RDBHilbert.hpp:208
static clockid_t global_clock_id
Clock type used for thread-local timing.
Definition: RDBHilbert.hpp:232
virtual size_t bytesUsed()
Definition: RDBHilbert.hpp:1087
static const ULI max_n
Given FBICRS, the maximum value for columnwise matrix size.
Definition: RDBHilbert.hpp:253
void reset()
Definition: RDBHilbert.hpp:976
RDBHilbert(std::vector< Triplet< T > > &input, ULI m, ULI n, T zero=0, std::vector< size_t > *_p_translate=NULL)
Default triplet-based constructor.
Definition: RDBHilbert.hpp:302
void wait()
Definition: RDBHilbert.hpp:332
size_t sync
Used for synchronising threads.
Definition: RDBHilbert.hpp:247
pthread_mutex_t * mutex
Pointer to the sync mutex.
Definition: RDBHilbert.hpp:119
static void end(pthread_mutex_t *mutex, pthread_cond_t *cond, size_t *sync, const size_t P)
Definition: RDBHilbert.hpp:426
static void collectY(RDB_shared_data< T > *shared)
Definition: RDBHilbert.hpp:853
ULI nor
Number of rows.
Definition: SparseMatrix.hpp:52
size_t * sync
Sync counter.
Definition: RDBHilbert.hpp:131
pthread_t * threads
Input vectors for multiple-RHS operations.
Definition: RDBHilbert.hpp:226
RDB_shared_data(size_t _id, size_t _P, std::vector< Triplet< double > > *_original, size_t *_nzb, pthread_mutex_t *_mutex, pthread_cond_t *_cond, pthread_mutex_t *_end_mutex, pthread_cond_t *_end_cond, size_t *_sync, size_t *_end_sync, size_t _ovsize, size_t _ovoffset, const T **const _in, T **const _out)
Recommended constructor.
Definition: RDBHilbert.hpp:172
std::vector< Triplet< T > > * original
How many SpMVs in a multiple-RHS computation (ZXa or ZaX).
Definition: RDBHilbert.hpp:104
T zero_element
The element considered to be zero.
Definition: SparseMatrix.hpp:63
pthread_mutex_t mutex
Stop/continue mechanism: mutex.
Definition: RDBHilbert.hpp:235
pthread_cond_t end_cond
Wait for end mechanism: condition.
Definition: RDBHilbert.hpp:244
const T * input
Input vector.
Definition: RDBHilbert.hpp:214
RDB_shared_data< T > * thread_data
array of initial thread data
Definition: RDBHilbert.hpp:229
size_t fillIn
Will store the total fillIn at this thread.
Definition: RDBHilbert.hpp:116
static void * thread(void *data)
Definition: RDBHilbert.hpp:450
virtual void zax(const T *x, T *z)
Definition: RDBHilbert.hpp:993
virtual unsigned long int n()
Queries the number of columns this matrix contains.
Definition: SparseMatrix.hpp:115
A single triplet value.
Definition: Triplet.hpp:52
size_t id
Process ID.
Definition: RDBHilbert.hpp:86
virtual void zax(const T *x, T *z, const size_t repeat, const clockid_t clock_id, double *elapsed_time)
Definition: RDBHilbert.hpp:998
Shared data for RDBHilbert threads.
Definition: RDBHilbert.hpp:81
void set_p_translate(std::vector< size_t > *_p_translate)
Sets p_translate to 0..P-1 by default, or equal to the optionally supplied vector.
Definition: RDBHilbert.hpp:259