SparseLibrary  Version 1.6.0
BetaHilbert.hpp
1 /*
2  * Copyright (c) 2007-2014, A. N. Yzelman, Utrecht University 2007-2011;
3  * KU Leuven 2011-2014.
4  * R. H. Bisseling, Utrecht University 2007-2014.
5  *
6  * This file is part of the Sparse Library.
7  *
8  * This library was developed under supervision of Prof. dr. Rob H. Bisseling at
9  * Utrecht University, from 2007 until 2011. From 2011-2014, development continued
10  * at KU Leuven, where Prof. dr. Dirk Roose contributed significantly to the ideas
11  * behind the newer parts of the library code.
12  *
13  * The Sparse Library is free software: you can redistribute it and/or modify
14  * it under the terms of the GNU General Public License as published by the
15  * Free Software Foundation, either version 3 of the License, or (at your
16  * option) any later version.
17  *
18  * The Sparse Library is distributed in the hope that it will be useful, but
19  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
20  * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21  * for more details.
22  *
23  * You should have received a copy of the GNU General Public License along
24  * with the Sparse Library. If not, see <http://www.gnu.org/licenses/>.
25  */
26 
27 /* \file
28  * File created by:
29  * A. N. Yzelman, Dept. of Computer Science, KU Leuven, 2011.
30  */
31 
32 #include <map>
33 #include <vector>
34 #include <iostream>
35 #include <limits.h>
36 #include <pthread.h>
37 
38 #ifndef _NO_LIBNUMA
39  #include <numa.h>
40 #endif
41 
42 #ifndef SIZE_MAX
43  #define SIZE_MAX static_cast< size_t >( -1 )
44 #endif
45 
46 #include "SparseMatrix.hpp"
47 #include "Matrix2HilbertCoordinates.hpp"
48 #include "FBICRS.hpp"
49 #include "MachineInfo.hpp"
50 
51 #ifndef _H_BETAHILBERT
52 #define _H_BETAHILBERT
53 
65 #define BH_COLLECT 1
66 
72 #define BH_REDUCE_BASE 2
73 
83 #define BH_USE_GLOBAL_Y 1
84 
88 template< typename T >
89 class shared_data {
90 
91  public:
92 
94  size_t id;
95 
97  size_t P;
98 
100  unsigned char mode;
101 
103  size_t repeat;
104 
106  std::vector< Triplet< T > > *original;
107 
109  size_t *nzb;
110 
112  size_t **nzc;
113 
115  double time;
116 
118  size_t bytes;
119 
121  pthread_mutex_t* mutex;
122 
124  pthread_cond_t* cond;
125 
127  pthread_mutex_t* end_mutex;
128 
130  pthread_cond_t* end_cond;
131 
133  size_t *sync;
134 
136  size_t *end_sync;
137 
140 
143 
146 
148  const T **input;
149 
151  T **output;
152 
154  shared_data(): id( -1 ), P( -1 ), mode( 0 ), original( NULL ), nzb( NULL ), nzc( NULL ), time( 0 ),
155  mutex( NULL ), cond( NULL ), end_mutex( NULL ), end_cond( NULL ), sync( NULL ),
156  end_sync( NULL ), output_vector_size( -1 ), output_vector_offset( -1 ),
157  local_y( NULL ), input( NULL ), output( NULL ) {}
158 
177  shared_data( size_t _id, size_t _P, std::vector< Triplet< double > > *_original, size_t *_nzb, size_t **_nzc,
178  pthread_mutex_t *_mutex, pthread_cond_t *_cond, pthread_mutex_t *_end_mutex, pthread_cond_t *_end_cond,
179  size_t *_sync, size_t *_end_sync, size_t _m,
180  const T **_in, T**_out ):
181  id( _id ), P( _P ), mode( 1 ), original( _original ), nzb( _nzb ), nzc( _nzc ), time( 0 ),
182  mutex( _mutex ), cond( _cond ), end_mutex( _end_mutex ), end_cond( _end_cond ),
183  sync( _sync ), end_sync( _end_sync ), output_vector_size( _m ), output_vector_offset( -1 ),
184  local_y( NULL ), input( _in ), output( _out ) {}
185 };
186 
188 int beta_uli_compare( const void *a, const void *b ) {
189  return ( *(size_t*)a - *(size_t*)b );
190 }
191 
194 template< typename T >
195 class BetaHilbert: public SparseMatrix< T, ULI > {
196 
197  private:
198 
199  protected:
200 
202  static size_t P;
203 
205  std::vector< unsigned short int > p_translate;
206 
208  const T* input;
209 
211  T* output;
212 
214  pthread_t *threads;
215 
218 
220  static clockid_t global_clock_id;
221 
223  pthread_mutex_t mutex;
224 
226  pthread_cond_t cond;
227 
229  pthread_mutex_t end_mutex;
230 
232  pthread_cond_t end_cond;
233 
235  size_t sync;
236 
238  size_t end_sync;
239 
241  static const ULI max_n = FBICRS< T >::beta_n;
242 
244  static const ULI max_m = FBICRS< T >::beta_m;
245 
247  void set_p_translate( std::vector< unsigned short int > *_p_translate ) {
248  if( _p_translate == NULL ) {
249  //get number of cores available
251  for( unsigned short int i = 0; i < P; ++i ) p_translate.push_back( i );
252  } else {
253  p_translate = *_p_translate;
254  P = p_translate.size();
255  }
256  }
257 
258  public:
259 
267  BetaHilbert( const std::string file, T zero = 0, std::vector< unsigned short int > *_p_translate = NULL ): input( NULL ), output( NULL ) {
268  set_p_translate( _p_translate );
269  this->loadFromFile( file, zero );
270  }
271 
281  BetaHilbert( std::vector< Triplet< T > >& input, ULI m, ULI n, T zero = 0, std::vector< unsigned short int > *_p_translate = NULL ): input( NULL ), output( NULL ) {
282  set_p_translate( _p_translate );
283  load( input, m, n, zero );
284  }
285 
287  virtual ~BetaHilbert() {
288  //set all daemon threads to exit mode
289  for( size_t i=0; i<P; i++ ) {
290  thread_data[ i ].mode = 4;
291  }
292 
293  //wake up all daemon threads
294  pthread_mutex_lock( &mutex );
295  pthread_cond_broadcast( &cond );
296  pthread_mutex_unlock( &mutex );
297 
298  //allow threads to exit gracefully
299  for( size_t i=0; i<P; i++ )
300  pthread_join( threads[ i ], NULL );
301 
302  //destroy data
303  delete [] thread_data;
304  delete [] threads;
305  pthread_mutex_destroy( &mutex );
306  pthread_cond_destroy( &cond );
307  }
308 
310  void wait() {
311  //wait for end signal
312  pthread_cond_wait( &end_cond, &end_mutex );
313  pthread_mutex_unlock( &end_mutex );
314  }
315 
324  virtual void load( std::vector< Triplet< T > >& input, const ULI m, const ULI n, const T zero ) {
325 #ifndef _NO_LIBNUMA
326  //set kernel to local thread allocation if it wasn't already the case
327  numa_set_localalloc();
328 #endif
329 
330  //base settings
331  this->zero_element = zero;
332  this->nor = m;
333  this->noc = n;
334  this->nnz = input.size();
335 
336  // Parallel distribution according to the Hilbert curve:
337  // -get maximum Hilbert coordinate
338  // (Note: curve coordinates should be implented such that (0,0) maps to 0!
339  // Otherwise a minimum Hilbert coordinate should also be found, and
340  // below code must be adapted to work on a Hilbert coordinate range
341  // instead)
342  // -simultaneously also get number of nonzeroes to locally store
343  // -translate to a range
344  // -call function to distribute this range over P threads
345  //
346  // Most operations are done in parallel (see thread function), basically
347  // only shared array allocation and the actual forking is in this function.
348 
349  size_t *nzb = new size_t [ this->nnz + P ];
350  size_t **nzc = new size_t*[ P ];
351  for( size_t i=0; i<P; i++ ) nzc[ i ] = NULL;
352 
353  //create P threads :)
354  this->threads = new pthread_t[ P ];
355  //initialize local initialisation data
356  thread_data = new shared_data< T >[ P ];
357  //initialize mutexes and conditions and a synchronisation counter
358  pthread_mutex_init( &mutex, NULL );
359  pthread_cond_init ( &cond, NULL );
360  pthread_mutex_init( &end_mutex, NULL );
361  pthread_cond_init ( &end_cond, NULL );
362  sync = 0;
363  end_sync = 0;
364  //lock end mutex (disallow threads that are created to signal for end
365  //before this thread is done with spawning children)
366  pthread_mutex_lock( &end_mutex );
367  //go forth and multiply
368  for( size_t i=0; i<P; i++ ) {
369  //build thread-local init data
370  thread_data[ i ] = shared_data< T >( i, P, &input, nzb, nzc, &mutex, &cond, &end_mutex, &end_cond, &sync, &end_sync, this->m(), &(this->input), &output );
371  //set fixed affinity for threads
372  cpu_set_t mask;
373  CPU_ZERO( &mask );
374  CPU_SET ( p_translate[ i ], &mask );
375  //prepare attributes
376  pthread_attr_t attr;
377  pthread_attr_init( &attr );
378  //set fixed affinity in attribute, so that it starts binded immediately
379  pthread_attr_setaffinity_np( &attr, sizeof( cpu_set_t ), &mask );
380  //fire up thread
381  pthread_create( &threads[ i ], &attr, &BetaHilbert::thread, (void*) &thread_data[i] );
382  //free attr
383  pthread_attr_destroy( &attr );
384  }
385 
386  //wait for threads to finish initialisation
387  wait();
388 
389  //delete temporary array
390  delete [] nzb;
391  delete [] nzc;
392  }
393 
402  static void end( pthread_mutex_t* mutex, pthread_cond_t* cond, size_t *sync, const size_t P ) {
403  pthread_mutex_lock( mutex );
404  (*sync)++;
405  if( *sync == P ) {
406  pthread_cond_signal( cond );
407  *sync = 0;
408  }
409  pthread_mutex_unlock( mutex );
410  }
411 
420  static void synchronise( pthread_mutex_t* mutex, pthread_cond_t* cond, size_t *sync, const size_t P ) {
421  pthread_mutex_lock( mutex );
422  (*sync)++;
423  if( *sync == P ) {
424  *sync = 0;
425  pthread_cond_broadcast( cond );
426  } else
427  pthread_cond_wait( cond, mutex );
428  pthread_mutex_unlock( mutex );
429  }
430 
437  static void* thread( void *data ) {
438  shared_data<T>* shared = (shared_data<T>*)data;
439  const size_t id = shared->id;
440  const size_t P = shared->P;
441  const size_t nnz = shared->original->size();
442  pthread_mutex_t *mutex = shared->mutex;
443  pthread_cond_t *cond = shared->cond;
444 
445  //check whether pinned to exactly one unit of execution
446  cpu_set_t mask;
447  CPU_ZERO( &mask );
448  pthread_getaffinity_np( pthread_self(), sizeof( cpu_set_t ), &mask );
449  for( size_t s=0; s<P; s++ ) {
450  if( s==id ) continue;
451  if( CPU_ISSET( s, &mask ) ) {
452  std::cerr << "Thread " << id << " mask is larger than one core" << " (" << s << " is set)!" << std::endl;
453  exit( 1 );
454  }
455  }
456 #ifndef NDEBUG
457  std::cout << "Phase 1 at thread " << shared->id << ": cache Hilbert values and get maximum one" << std::endl;
458 #endif
459  size_t h1, h2, max1, max2;
460  size_t *h2s = shared->nzb;
461  const size_t blocksize = (nnz % P) > 0 ? nnz / P + 1 : nnz / P;
462  Matrix2HilbertCoordinates::IntegerToHilbert( (*(shared->original))[ id*blocksize ].i() / max_m, (*(shared->original))[ id*blocksize ].j() / max_n, h1, h2 );
463  h2s[ id * blocksize ] = h2;
464  max1 = h1;
465  max2 = h2;
466  for( size_t i=id*blocksize+1; i<shared->original->size() && i < (id+1)*blocksize; ++i ) {
468  (*(shared->original))[ i ].j() / max_n, h1, h2 );
469  h2s[ i ] = h2;
470  if( h1 > max1 ) { max1 = h1; max2 = h2; }
471  else if( h1 == max1 && h2 > max2 ) max2 = h2;
472  }
473 
474  //check for too large ranges
475  if( max1 > 0 ) {
476  std::cerr << "Hilbert coordinate range is larger than 2^64-1. Current counting sort mechanism thus requires an array of 2^64 integers-->quitting." << std::endl;
477  //it is not possible to allocate such arrays on 64-bit machines. Also, it is extremely unlikely counting sort is the most efficient sort when this happens--
478  //very likely most Beta blocks will be empty and a normal quicksort is more appropriate.
479  exit( 1 );
480  }
481 
482  //communicate local maxima
483  h2s[ nnz + id ] = max2; //false sharing, but well...
484 
485  //sync
486  BetaHilbert::synchronise( mutex, cond, shared->sync, shared->P );
487 
488  //determine global maximum
489  for( size_t i=0; i<P; ++i )
490  if( h2s[ shared->original->size() + i ] > max2 )
491  max2 = h2s[ shared->original->size() + i ];
492  max2++; //number of betablocks is maximum index plus 1
493 
494  //check for block size appropriateness
495  unsigned char proceed;
496  if( max2 >= nnz ) {
497 #ifndef NDEBUG
498  std::cout << "Number of beta_m x beta_n blocks exceeds actual number of nonzeroes; going for (sequential) quicksort implementation." << std::endl;
499 #endif
500  proceed = 0;
501  } else {
502 #ifndef NDEBUG
503  std::cout << "Choosing counting-sort implementation." << std::endl;
504 #endif
505  proceed = 1;
506  }
507 
508  // These values are to be filled
509  size_t m, n, start, end;
510  std::vector< std::vector< Triplet< T > > > beta;
511 
512  if( proceed == 0 ) {
513  //temporary array
514  size_t *horig = NULL;
515 
516  //sequential quicksort O(nz*log(nz))
517  if( id == 0 ) {
518  horig = new size_t[ shared->original->size() ];
519  for( size_t i=0; i<shared->original->size(); ++i )
520  horig[ i ] = h2s[ i ];
521  qsort( h2s, nnz, sizeof( size_t ), beta_uli_compare );
522  assert( h2s[ nnz-1 ]+1 == max2 );
523  }
524  //sync
525  BetaHilbert::synchronise( mutex, cond, shared->sync, shared->P );
526  //determine distribution
527  /*for( size_t i = blocksize, k=0; i < shared->original->size(); i += blocksize, ++k ) {
528  const size_t target = h2s[ i ];
529  size_t split = i;
530  for( size_t dev_f = 0, dev_b = 0; ; dev_f++, dev_b-- ) {
531  if( i + dev_f < shared->original->size() ) {
532  if( h2s[ i + dev_f ] != target ) {
533  split = i + dev_f - 1;
534  i += dev_f / 2;
535  break;
536  }
537  } else if( i - dev_b >= 0 ) {
538  if( h2s[ i - dev_b ] != target ) {
539  split = i - dev_b;
540  i += dev_b / 2;
541  break;
542  }
543  } else
544  break;
545  }
546  if( shared->id == k ) {
547  std::cout << "Processor " << shared->id << " chooses to split at " << split << " with h=" << h2s[ split ] << " while target=" << target << std::endl;
548  start = h2s[ split ];
549  } else if( shared->id + 1 == k ) {
550  std::cout << "Processor " << shared->id << " chooses to end at " << split << " with h=" << h2s[ split ] << "." << std::endl;
551  end = h2s[ split ];
552  }
553  }
554  if( shared->id == shared->P - 1 ) end = h2s[ shared->original->size() - 1 ];*/
555  start = id * blocksize;
556  end = start + blocksize;
557  if( end > max2 ) end = max2;
558  const size_t target_s = h2s[ start ];
559  while( start < max2 && h2s[ start ] == target_s ) start--;
560  start++;
561  const size_t target_e = h2s[ end ];
562  while( end < max2 && h2s[ end ] == target_e ) end--;
563  end++;
564  if( shared->id + 1 == shared->P ) end = nnz - 1;
565  start = h2s[ start ];
566  end = h2s[ end ];
567 #ifndef NDEBUG
568  std::cout << "Processor " << id << " range is " << start << " to " << end << ", max = " << max2 << std::endl;
569 #endif
570  assert( start <= end );
571  assert( end <= max2 );
572 
573  //build local data structure from the triplets in shared->original
574  //and beta block hilbert index to beta translation map
575  std::map< size_t, size_t > h2b;
576 
577  //prepare beta and map
578  //get first hilbert block coordinate
579  h2 = h2s[ 0 ];
580  //create corresponding vector in beta
581  beta.push_back( std::vector< Triplet< T > >() );
582  //remember index
583  h2b[ h2 ] = 0;
584  //keep next index
585  size_t c = 1;
586  //do the same for the remainder of available hilbert coordinates
587  for( size_t i=1; i<shared->original->size(); ++i ) {
588  h2 = h2s[ i ];
589  beta.push_back( std::vector< Triplet< T > >() );
590  h2b[ h2 ] = c++;
591  }
592 
593  //prepare to get matrix size (m, n) and some statistics too
594  size_t smin_m = SIZE_MAX;
595  size_t smin_n = SIZE_MAX;
596  size_t smax_m, smax_n;
597  smax_m = smax_n = m = n = 0;
598  size_t *ms = new size_t[ end - start ];
599  size_t *ns = new size_t[ end - start ];
600  size_t *minms = new size_t[ end - start ];
601  size_t *minns = new size_t[ end - start ];
602  for( size_t i=0; i<end-start; i++ ) {
603  ms[ i ] = 0;
604  ns[ i ] = 0;
605  minms[ i ] = SIZE_MAX;
606  minns[ i ] = SIZE_MAX;
607  }
608 
609  if( id == 0 ) {
610  for( size_t i=0; i<nnz; i++ )
611  h2s[ i ] = horig[ i ];
612  delete [] horig;
613  }
614 
615  for( size_t i=0; i<shared->original->size(); i++ ) {
616  const ULI row = (*(shared->original))[ i ].i();
617  const ULI col = (*(shared->original))[ i ].j();
618  h2 = h2s[ i ];
619  if( row > m ) m = row;
620  if( col > n ) n = col;
621  if( h2 >= start && h2 < end ) {
622  if( row > ms[ h2-start ] ) ms[ h2-start ] = row;
623  if( col > ns[ h2-start ] ) ns[ h2-start ] = col;
624  if( row < minms[ h2-start ] ) minms[ h2-start ] = row;
625  if( col < minns[ h2-start ] ) minns[ h2-start ] = col;
626  if( row < smin_m ) smin_m = row;
627  if( col < smin_n ) smin_n = col;
628  if( row > smax_m ) smax_m = row;
629  if( col > smax_n ) smax_n = col;
630  }
631  }
632  //matrix size is 1 plus max index
633  m++;
634  n++;
635 
636  //sync
637  BetaHilbert::synchronise( mutex, cond, shared->sync, shared->P );
638 
639  for( size_t i=0; i<shared->original->size(); i++ ) {
640  h2 = h2s[ i ];
641  assert( h2 < max2 );
642  if( h2 >= start && h2 < end ) {
643  beta[ h2b[ h2 ] ].push_back( (*(shared->original))[ i ] );
644  }
645  }
646 
647  size_t cursum = 0;
648  for( size_t i=0; i<beta.size(); i++ )
649  cursum += beta[i].size();
650 #ifndef NDEBUG
651  std::cout << "Thread " << shared->id << ": " << smin_m << "," << smax_m << " times " << smin_n << "," << smax_n << " holding " << cursum << " nonzeroes." << std::endl;
652 #endif
653 
654  //sync
655  BetaHilbert::synchronise( mutex, cond, shared->sync, shared->P );
656 
657  //remove temporary values
658  delete [] ms;
659  delete [] ns;
660  delete [] minms;
661  delete [] minns;
662 
663  } else if( proceed == 1 ) {
664  //start counting sort
665 #ifndef NDEBUG
666  std::cout << "Phase 2: count nonzeroes in each beta_m x beta_n block (counting sort step 1), and derive distribution" << std::endl;
667 #endif
668  size_t *nzc = new size_t[ max2 ];
669  for( size_t i = 0; i < max2; ++i ) nzc[ i ] = 0;
670  for( size_t i=id*blocksize; i<nnz && i < (id+1)*blocksize; ++i ) nzc[ h2s[ i ] ]++;
671  shared->nzc[ id ] = nzc;
672 
673  //sync
674  BetaHilbert::synchronise( mutex, cond, shared->sync, shared->P );
675 
676  size_t sum = 0;
677 #ifndef NDEBUG
678  //sanity check
679  if( id == 0 ) {
680  for( size_t i=0; i<max2; i++ )
681  for( size_t s=0; s<P; s++ )
682  sum += shared->nzc[ s ][ i ];
683  assert( sum == nnz );
684  sum = 0;
685  }
686  BetaHilbert::synchronise( mutex, cond, shared->sync, shared->P );
687 #endif
688 
689  //combine counting sort
690  for( size_t i=id*(max2/P+1); i<(id+1)*(max2/P+1)&&i<max2; ++i ) {
691  for( size_t s=1; s<P; ++s )
692  shared->nzc[ 0 ][ i ] += shared->nzc[ s ][ i ];
693  }
694 
695  //sync
696  BetaHilbert::synchronise( mutex, cond, shared->sync, shared->P );
697 
698 #ifndef NDEBUG
699  //sanity check
700  sum = 0;
701  for( size_t i=0; i<max2; i++ ) sum += shared->nzc[ 0 ][ i ];
702  assert( sum == nnz );
703  sum = 0;
704 #endif
705 
706  //get start and end positions of processor 0
707  const size_t nnz_target = nnz != 0 ? nnz / P + 1 : nnz / P;
708  size_t start = 0;
709  size_t end = 1;
710  size_t cursum = shared->nzc[ 0 ][ 0 ];
711  for( ; end<max2 && cursum < nnz_target; end++ )
712  cursum += shared->nzc[ 0 ][ end ];
713 
714  //get my start and end position
715  for( size_t i=0; i<P; i++ ) {
716  if( shared->id == i ) {
717  std::cout << "Thread " << i << ": local nnz count is " << cursum << " (storing block indices " << start << ", " << end << ") out of " << max2 << " blocks present." << std::endl;
718  break;
719  }
720  sum += cursum;
721  start = end;
722  cursum = 0;
723  for( ; end < max2 && ( (i == P-1) || (cursum < nnz_target) ); end++ )
724  cursum += shared->nzc[ 0 ][ end ];
725  }
726 
727 #ifndef NDEBUG
728  std::cout << "Phase 3 at processor " << shared->id << ": getting local nonzeroes (counting sort step 2)" << std::endl;
729 #endif
730 
731  //build local data structure from the triplets in shared->original
732  beta.resize( end - start );
733  for( size_t i=0; i<end-start; i++ )
734  beta[ i ] = std::vector< Triplet< T > >( shared->nzc[ 0 ][ i + start ] );
735 
736  //prepare to get matrix size (m, n) and some statistics too
737  size_t smin_m, smin_n;
738  size_t smax_m, smax_n;
739  smin_m = smin_n = ULONG_MAX;
740  smax_m = smax_n = m = n = 0;
741  size_t *ms = new size_t[ end - start ];
742  size_t *ns = new size_t[ end - start ];
743  size_t *minms = new size_t[ end - start ];
744  size_t *minns = new size_t[ end - start ];
745  for( size_t i=0; i<end-start; i++ ) {
746  ms[ i ] = 0;
747  ns[ i ] = 0;
748  minms[ i ] = ULONG_MAX;
749  minns[ i ] = ULONG_MAX;
750  }
751 
752  //copy local elements
753  for( size_t i=0; i<end-start; i++ )
754  assert( beta[ i ].size() == shared->nzc[ 0 ][ i + start ] );
755 
756  for( size_t i=0; i<nnz; ++i ) {
757  const ULI row = (*(shared->original))[ i ].i();
758  const ULI col = (*(shared->original))[ i ].j();
759  h2 = shared->nzb[ i ];
760  if( row > m ) m = row;
761  if( col > n ) n = col;
762  if( h2 >= start && h2 < end ) {
763  if( row > ms[ h2-start ] ) ms[ h2-start ] = row;
764  if( col > ns[ h2-start ] ) ns[ h2-start ] = col;
765  if( row < minms[ h2-start ] ) minms[ h2-start ] = row;
766  if( col < minns[ h2-start ] ) minns[ h2-start ] = col;
767  if( row < smin_m ) smin_m = row;
768  if( col < smin_n ) smin_n = col;
769  if( row > smax_m ) smax_m = row;
770  if( col > smax_n ) smax_n = col;
771  }
772  }
773  //matrix size is 1 plus max index
774  m++;
775  n++;
776 
777  //sync here because the following phase will modify nzc,
778  //while it may still be used above (and assumed constant)
779  BetaHilbert::synchronise( mutex, cond, shared->sync, shared->P );
780 
781  for( size_t i=0; i<nnz; ++i ) {
782 #ifndef NDEBUG
783  const ULI row = (*(shared->original))[ i ].i();
784  const ULI col = (*(shared->original))[ i ].j();
785 #endif
786  h2 = shared->nzb[ i ];
787  assert( h2 < max2 );
788  if( h2 >= start && h2 < end ) {
789 #ifndef NDEBUG
790  //sanity check: this nonzero should be grouped with other nonzeroes in the same block
791  if( beta[ h2-start ].size() > shared->nzc[ 0 ][ h2 ] ) {
792  const ULI temp = h2;
793  Matrix2HilbertCoordinates::IntegerToHilbert( beta[ h2-start ][ shared->nzc[ 0 ][ h2 ] ].i() / max_m,
794  beta[ h2-start ][ shared->nzc[ 0 ][ h2 ] ].j() / max_n,
795  h1, h2 );
796  assert( h1 == 0 );
797  assert( h2 == temp );
798  h2 = temp;
799  assert( row / max_m == beta[ h2-start ][ shared->nzc[ 0 ][ h2 ] ].i() / max_m );
800  assert( col / max_n == beta[ h2-start ][ shared->nzc[ 0 ][ h2 ] ].j() / max_n );
801  }
802 #endif
803  beta[ h2-start ][ --(shared->nzc[ 0 ][ h2 ]) ] = (*(shared->original))[ i ];
804  assert( row == beta[ h2-start ][ shared->nzc[ 0 ][ h2 ] ].i() );
805  assert( col == beta[ h2-start ][ shared->nzc[ 0 ][ h2 ] ].j() );
806  }
807  }
808 
809 #ifndef NDEBUG
810  std::cout << "Thread " << shared->id << ": " << smin_m << "," << smax_m << " times " << smin_n << "," << smax_n << std::endl;
811 #endif
812 
813  //sanity check
814  for( size_t i=0; i<end-start; i++ ) {
815  assert( shared->nzc[ 0 ][ i + start ] == 0 );
816  if( beta[ i ].size() == 0 ) continue;
817  if( ms[ i ] - minms[ i ] >= max_m ) {
818  std::cerr << "BetaHilbert thread construction: rowwise range (" << (ms[ i ]) << " to " << minms[ i ] << ") over maximum size! (h2=" << (i+start) << ")" << std::endl;
819  exit( 1 );
820  }
821  if( ns[ i ] - minns[ i ] >= max_n ) {
822  std::cerr << "BetaHilbert thread construction: columnwise range over maximum size!" << std::endl;
823  exit( 1 );
824  }
825  }
826 
827  //sync
828  BetaHilbert::synchronise( mutex, cond, shared->sync, shared->P );
829 
830  //clear local temporary array
831  delete [] nzc;
832 
833  //remove temporary values
834  delete [] ms;
835  delete [] ns;
836  delete [] minms;
837  delete [] minns;
838  } else {
839  std::cerr << "Invalid value for proceed: " << proceed << std::endl;
840  exit( 1 );
841  }
842 
843  //load into FBICRS
844 #ifndef NDEBUG
845  std::cout << "Processor " << shared->id << ": loading into local FBICRS data structure..." << std::endl;
846 #endif
847  FBICRS< T > dss( beta, m, n );
848  beta.clear();
849 
850  //remember data use
851  shared->bytes = dss.bytesUsed();
852 
853  //create local shadow of y to avoid write-contention
854  T* y = NULL;
855  if( BH_USE_GLOBAL_Y && id == 0 )
856  std::cerr << "Warning: thread 0 will use global y as local y (see BH_USE_GLOBAL_Y)" << std::endl;
857  if( !BH_USE_GLOBAL_Y || shared->id > 0 ) { //id 0 will take global output vector for local y
858  y = new T[ shared->output_vector_size ];
859  for( size_t i=0; i<shared->output_vector_size; i++ )
860  y[ i ] = 0.0;
861  }
862 
863  //make local y known to outside world
864  shared->local_y = y;
865 
866  //exit construction mode
867  shared->mode = 0;
868 
869  //signal end of construction
870  pthread_mutex_lock( mutex );
871  BetaHilbert::end( shared->end_mutex, shared->end_cond, shared->end_sync, shared->P );
872 
873  //enter daemon mode
874 #ifndef NDEBUG
875  std::cout << "Thread " << id << " construction done, entering daemon mode." << std::endl;
876 #endif
877  while( true ) {
878  struct timespec clk_start, clk_stop;
879  pthread_cond_wait( cond, mutex );
880  pthread_mutex_unlock( mutex );
881 
882  if( shared->mode == 4 ) break;
883 
884  switch( shared->mode ) {
885  case 5:
886  if( !(BH_USE_GLOBAL_Y && shared->id == 0) ) {
887  for( size_t i = 0; i < shared->output_vector_size; ++i ) {
888  shared->local_y[ i ] = 0;
889  }
890  }
891  break;
892  case 3:
893  assert( *(shared->input) != NULL );
894  assert( *(shared->output) != NULL );
895  if( BH_USE_GLOBAL_Y && shared->id == 0 ) {
896  y = *(shared->output);
897  shared->local_y= y;
898  }
899  assert( y != NULL );
900 
901  clock_gettime( global_clock_id, &clk_start);
902  shared->time = 0.0;
903  for( size_t i=0; i<shared->repeat; i++ ) {
904  dss.zxa_fb( *(shared->input), y );
905  if( BH_COLLECT == -1 )
906  break;
907  else if( BH_COLLECT != 0 )
908  BetaHilbert::synchronise( mutex, cond, shared->sync, shared->P );
909  collectY( shared );
910  }
911  clock_gettime( global_clock_id, &clk_stop);
912  shared->time = (clk_stop.tv_sec-clk_start.tv_sec)*1000;
913  shared->time += (clk_stop.tv_nsec-clk_start.tv_nsec)/1000000.0;
914  break;
915  case 2:
916  assert( *(shared->input) != NULL );
917  assert( *(shared->output) != NULL );
918  if( BH_USE_GLOBAL_Y && shared->id == 0 ) {
919  y = (*shared->output);
920  shared->local_y = y;
921  }
922  assert( y != NULL );
923  clock_gettime( global_clock_id, &clk_start);
924  shared->time = 0.0;
925  for( size_t i=0; i<shared->repeat; i++ ) {
926  dss.zax_fb( *(shared->input), y );
927  if( BH_COLLECT == -1 )
928  continue;
929  else if( BH_COLLECT != 0 )
930  BetaHilbert::synchronise( mutex, cond, shared->sync, shared->P );
931  collectY( shared );
932  }
933  clock_gettime( global_clock_id, &clk_stop);
934  shared->time = (clk_stop.tv_sec-clk_start.tv_sec)*1000;
935  shared->time += (clk_stop.tv_nsec-clk_start.tv_nsec)/1000000.0;
936  break;
937  default:
938  std::cout << "Thread " << id << ": Error, undefined operation (" << shared->mode << ")!" << std::endl;
939  exit( -1 );
940  }
941  shared->mode = 0;
942 
943  //signal end of operation
944  pthread_mutex_lock( mutex );
945  BetaHilbert::end( shared->end_mutex, shared->end_cond, shared->sync, shared->P );
946  }
947 
948  //done
949  if( !BH_USE_GLOBAL_Y || shared->id > 0 )
950  delete [] y;
951  return (NULL);
952  }
953 
959  static void collectY( shared_data<T> *shared ) {
960  const ULI s = shared->id;
961  const ULI p = shared->P;
962  const ULI m = shared->output_vector_size;
963  shared_data<T> *datas = shared; datas -= s;
964 
965  if( BH_COLLECT == 0 ) {
966  //do nothing, will be handled after SPMD code
967  } else if( BH_COLLECT == 1 ) {
968  //use block distribution
969  const ULI blocksize = (m % p == 0 ) ? m / p : m / p + 1;
970  const ULI m_start = s * blocksize;
971  const ULI m_end = m_start + blocksize > m ? m : m_start + blocksize;
972 
973  if( s == p-1 ) assert( m_end == shared->output_vector_size );
974  //do collect items of own block
975  for( size_t i = m_start; i<m_end; i++ ) {
976  for( size_t k = BH_USE_GLOBAL_Y; k < p; k++ ) { //start at k=1 when local y of proc. 0 is global y itself
977  (*(shared->output))[ i ] += datas[ k ].local_y[ i ];
978  }
979  }
980  } else if( BH_COLLECT == 2 ) {
981  //parallel reducing collect
982  size_t step_p = BH_REDUCE_BASE;
983  size_t prev_step_p = 1;
984  const ULI blocksize = (m % p == 0 ) ? m / p : m / p + 1;
985  const ULI m_start = s * blocksize;
986  const ULI m_end = m_start + blocksize > m ? m : m_start + blocksize;
987  while( prev_step_p < p ) {
988  for( size_t start_p = 0; start_p < p; start_p += step_p ) {
989  //do collect items of own block
990  for( size_t k = start_p + prev_step_p; k < p && k < start_p + step_p; k += prev_step_p ) {
991  for( size_t i = m_start; i<m_end; i++ )
992  datas[ start_p ].local_y[ i ] += datas[ k ].local_y[ i ];
993  }
994  }
995  prev_step_p = step_p;
996  step_p *= BH_REDUCE_BASE;
997  }
998  //if datas[0].local_y != BetaHilbert<T>::output, one final step is required:
999  if( !BH_USE_GLOBAL_Y )
1000  for( size_t i = m_start; i<m_end; i++ )
1001  (*(shared->output))[ i ] += datas[ 0 ].local_y[ i ];
1002  } else if( BH_COLLECT == 3 ) {
1003  size_t step_p = 2;
1004  size_t prev_step_p = 1;
1005  while( prev_step_p < p ) {
1006  for( size_t start_p = 0; start_p < p; start_p += step_p ) {
1007  if( prev_step_p > 1 )
1008  BetaHilbert::synchronise( shared->mutex, shared->cond, shared->sync, shared->P );
1009  if( start_p != s ) continue;
1010  for( size_t k = start_p + prev_step_p; k < p && k < start_p + step_p; k += prev_step_p )
1011  for( size_t i = 0; i < m; i++ )
1012  datas[ start_p ].local_y[ i ] += datas[ k ].local_y[ i ];
1013  }
1014  prev_step_p = step_p;
1015  step_p *= 2;
1016  }
1017  //if datas[0].local_y != BetaHilbert<T>::output, one final step is required:
1018  if( !BH_USE_GLOBAL_Y && s == 0 )
1019  for( size_t i = 0; i < m; i++ )
1020  (*(shared->output))[ i ] += datas[ 0 ].local_y[ i ];
1021  } else {
1022  std::cerr << "Error: output vector collection strategy " << BH_COLLECT << " not implemented!" << std::endl;
1023  exit( 1 );
1024  }
1025  }
1026 
1033  virtual void zxa( const T* x, T* z ) {
1034  zxa( x, z, 1 );
1035  }
1036 
1044  virtual void zxa( const T* x, T* z, const size_t repeat ) {
1045  //set all daemon threads to do zxa
1046  for( size_t i=0; i<P; i++ ) {
1047  thread_data[ i ].mode = 3;
1048  thread_data[ i ].repeat = repeat;
1049  }
1050 
1051  //set input vector
1052  input = x;
1053 
1054  //set output vector
1055  output = z;
1056 
1057  //wake up all daemon threads
1058  pthread_mutex_lock( &end_mutex );
1059  pthread_mutex_lock( &mutex );
1060  pthread_cond_broadcast( &cond );
1061  pthread_mutex_unlock( &mutex );
1062 
1063  //wait for end of operation
1064  wait();
1065 
1066  //sequential final collect method
1067  if( BH_COLLECT == 0 )
1068  for( size_t i=0; i<this->nor; i++ )
1069  for( size_t s=BH_USE_GLOBAL_Y; s<P; s++ ) //start with s=1 when local y of proc 0 is global y (or rather z) itself
1070  z[ i ] += thread_data[ s ].local_y[ i ];
1071 
1072  //unset vectors
1073  input = NULL;
1074  output = NULL;
1075  }
1076 
1080  void reset() {
1081  //reset all local output vectors
1082  for( size_t i=0; i<P; i++ ) {
1083  thread_data[ i ].mode = 5;
1084  }
1085 
1086  //wake up all daemon threads
1087  pthread_mutex_lock( &end_mutex );
1088  pthread_mutex_lock( &mutex );
1089  pthread_cond_broadcast( &cond );
1090  pthread_mutex_unlock( &mutex );
1091 
1092  //wait for end of operation
1093  wait();
1094  }
1095 
1102  virtual T* mv( const T* x ) {
1103  //over-allocate in case we use matrices that are too small to be
1104  //vectorised internally (assuming we are using vecBICRS as sub_ds).
1105  size_t allocsize = (this->nor + 1) * sizeof( T );
1106  //allocate, either using libnuma (to interleave for i86)
1107  //or using dynamic aligned allocs (for Xeon Phi MICs)
1108 #ifndef _NO_LIBNUMA
1109  T* ret = (T*) numa_alloc_interleaved( allocsize );
1110 #else
1111  T* ret = (T*) _mm_malloc( allocsize, 64 ); //instead of `new T[ nor ];'
1112 #endif
1113  //set to 0-vector
1114  for( ULI i=0; i<this->nor; i++ ) {
1115  ret[ i ] = this->zero_element;
1116  }
1117 
1118  //reset output vectors
1119  reset();
1120 
1121  //do in-place SpMV
1122  zax( x, ret );
1123 
1124  //return new output vector
1125  return ret;
1126  }
1127 
1134  virtual void zax( const T* x, T* z ) {
1135  zax( x, z, 1, 0, NULL );
1136  }
1137 
1147  virtual void zax( const T* x, T* z, const size_t repeat, const clockid_t clock_id, double *elapsed_time ) {
1148  //set all daemon threads to do zax
1149  for( size_t i=0; i<P; i++ ) {
1150  thread_data[ i ].mode = 2;
1151  thread_data[ i ].repeat = repeat;
1152  }
1153 
1154  //set global clock ID
1155  global_clock_id = clock_id;
1156 
1157  //set input vector
1158  input = x;
1159 
1160  //set output vector
1161  output = z;
1162 
1163  //wake up all daemon threads
1164  pthread_mutex_lock( &end_mutex );
1165  pthread_mutex_lock( &mutex );
1166  pthread_cond_broadcast( &cond );
1167  pthread_mutex_unlock( &mutex );
1168 
1169  //wait for end of operation
1170  wait();
1171 
1172  //sequential final collect method
1173  if( BH_COLLECT == 0 )
1174  for( size_t i=0; i<this->nor; i++ )
1175  for( size_t s=BH_USE_GLOBAL_Y; s<P; s++ ) //start with s=1 when local y of proc 0 is global y (or rather z) itself
1176  z[ i ] += thread_data[ s ].local_y[ i ];
1177 
1178  //get elapsed time
1179  double maxtime = 0.0;
1180  for( size_t i=0; i<P; i++ ) {
1181  const double curtime = thread_data[ i ].time;
1182  if( curtime > maxtime ) maxtime = curtime;
1183  }
1184  if( elapsed_time != NULL )
1185  *elapsed_time += maxtime;
1186 
1187  //unset vectors
1188  input = NULL;
1189  output = NULL;
1190  }
1191 
1193  virtual void getFirstIndexPair( ULI &i, ULI &j ) {
1194  std::cerr << "Warning: BetaHilbert::getFirstIndexPair has no unique answer since it implements a parallel multiplication!\nIgnoring call..." << std::endl;
1195  }
1196 
1198  virtual size_t bytesUsed() {
1199  size_t ret = 0;
1200  for( size_t s = 0; s < P; ++s )
1201  ret += thread_data[ s ].bytes;
1202  return ret;
1203  }
1204 
1205 };
1206 
1207 template< typename T > size_t BetaHilbert< T >::P = 0;
1208 
1209 template< typename T > clockid_t BetaHilbert< T >::global_clock_id = 0;
1210 
1211 #endif
1212 
Hierarchical BICRS with fixed subblock size and distribution.
Definition: FBICRS.hpp:75
ULI nnz
Number of non-zeros.
Definition: SparseMatrix.hpp:58
size_t end_sync
Used for construction end signal.
Definition: BetaHilbert.hpp:238
virtual ~BetaHilbert()
Base deconstructor.
Definition: BetaHilbert.hpp:287
shared_data()
Base constructor.
Definition: BetaHilbert.hpp:154
double time
Will store local timing.
Definition: BetaHilbert.hpp:115
The Beta Hilbert triplet scheme.
Definition: BetaHilbert.hpp:195
virtual void zxa(const T *x, T *z, const size_t repeat)
Computes z=xA in place, a given number of times successively.
Definition: BetaHilbert.hpp:1044
void set_p_translate(std::vector< unsigned short int > *_p_translate)
Sets p_translate to 0..P-1 by default, or equal to the optionally supplied vector.
Definition: BetaHilbert.hpp:247
virtual void zxa_fb(const _t_value *__restrict__ x_p, _t_value *__restrict__ y_p)
Interleaved zxa kernel for use with the BetaHilbert scheme.
Definition: FBICRS.hpp:501
size_t P
Total number of processors.
Definition: BetaHilbert.hpp:97
pthread_mutex_t * end_mutex
Mutex used for end sync.
Definition: BetaHilbert.hpp:127
shared_data(size_t _id, size_t _P, std::vector< Triplet< double > > *_original, size_t *_nzb, size_t **_nzc, pthread_mutex_t *_mutex, pthread_cond_t *_cond, pthread_mutex_t *_end_mutex, pthread_cond_t *_end_cond, size_t *_sync, size_t *_end_sync, size_t _m, const T **_in, T **_out)
Default constructor.
Definition: BetaHilbert.hpp:177
static clockid_t global_clock_id
Clock type used for thread-local timing.
Definition: BetaHilbert.hpp:220
static size_t P
Number of threads to fire up.
Definition: BetaHilbert.hpp:202
size_t sync
Used for synchronising threads.
Definition: BetaHilbert.hpp:235
T * output
Output vector.
Definition: BetaHilbert.hpp:211
ULI i() const
Definition: Triplet.hpp:70
const T * input
Input vector.
Definition: BetaHilbert.hpp:208
shared_data< T > * thread_data
array of initial thread data
Definition: BetaHilbert.hpp:217
unsigned long int cores() const
The number of available cores.
Definition: MachineInfo.cpp:77
virtual unsigned long int m()
Queries the number of rows this matrix contains.
Definition: SparseMatrix.hpp:107
pthread_cond_t end_cond
Wait for end mechanism: condition.
Definition: BetaHilbert.hpp:232
static void collectY(shared_data< T > *shared)
Code that collects a distributed output vector.
Definition: BetaHilbert.hpp:959
virtual void getFirstIndexPair(ULI &i, ULI &j)
Definition: BetaHilbert.hpp:1193
static const MachineInfo & getInstance()
Gets a singleton instance.
Definition: MachineInfo.cpp:38
Shared data for BetaHilbert threads.
Definition: BetaHilbert.hpp:89
size_t ** nzc
Will contain the nonzero counts of separate blocks.
Definition: BetaHilbert.hpp:112
static void IntegerToHilbert(const size_t i, const size_t j, size_t &h1, size_t &h2)
New method, October 2010.
Definition: Matrix2HilbertCoordinates.cpp:48
pthread_mutex_t end_mutex
Wait for end mechanism: mutex.
Definition: BetaHilbert.hpp:229
const T ** input
Array of all local input vectors of all SPMD processes.
Definition: BetaHilbert.hpp:148
size_t repeat
how many times to repeat the operation set in `mode' (above, only for 2 and 3)
Definition: BetaHilbert.hpp:103
pthread_cond_t * cond
Condition used for synchronisation.
Definition: BetaHilbert.hpp:124
size_t * nzb
Will cache block numbers of nonzeroes.
Definition: BetaHilbert.hpp:109
void loadFromFile(const std::string file, const T zero=0)
Function which loads a matrix from a matrix market file.
Definition: SparseMatrix.hpp:89
pthread_mutex_t mutex
Stop/continue mechanism: mutex.
Definition: BetaHilbert.hpp:223
Interface common to all sparse matrix storage schemes.
Definition: SparseMatrix.hpp:46
T * local_y
Pointer to the local output vector.
Definition: BetaHilbert.hpp:145
std::vector< unsigned short int > p_translate
Which processors to pin threads to.
Definition: BetaHilbert.hpp:205
size_t output_vector_offset
Offset of the local output vector compared to global indices.
Definition: BetaHilbert.hpp:142
virtual void zxa(const T *x, T *z)
Computes z=xA in place.
Definition: BetaHilbert.hpp:1033
static const ULI max_n
Given FBICRS, the maximum value for columnwise matrix size.
Definition: BetaHilbert.hpp:241
unsigned char mode
0 undef, 1 init, 2 zax, 3 zxa, 4 exit, 5 reset
Definition: BetaHilbert.hpp:100
ULI noc
Number of columns.
Definition: SparseMatrix.hpp:55
size_t * end_sync
Counter used for end sync.
Definition: BetaHilbert.hpp:136
virtual void zax(const T *x, T *z, const size_t repeat, const clockid_t clock_id, double *elapsed_time)
Computes z=Ax in place, a given number of times in succession, and measures the time taken...
Definition: BetaHilbert.hpp:1147
static void synchronise(pthread_mutex_t *mutex, pthread_cond_t *cond, size_t *sync, const size_t P)
Thread synchronisation function.
Definition: BetaHilbert.hpp:420
pthread_cond_t cond
Stop/continue mechanism: condition.
Definition: BetaHilbert.hpp:226
static const ULI max_m
Given FBICRS, the maximum value for the rowwise matrix size, assuming short ints on ICRS at the lower...
Definition: BetaHilbert.hpp:244
size_t id
Thread ID.
Definition: BetaHilbert.hpp:94
virtual size_t bytesUsed()
Function to query the amount of storage required by this sparse matrix.
Definition: FBICRS.hpp:664
static void end(pthread_mutex_t *mutex, pthread_cond_t *cond, size_t *sync, const size_t P)
End synchronisation function.
Definition: BetaHilbert.hpp:402
pthread_t * threads
p_threads associated to this data strcuture
Definition: BetaHilbert.hpp:214
virtual size_t bytesUsed()
Definition: BetaHilbert.hpp:1198
pthread_mutex_t * mutex
Mutex used for synchronisation.
Definition: BetaHilbert.hpp:121
ULI nor
Number of rows.
Definition: SparseMatrix.hpp:52
void reset()
Reset all local output vectors to zero.
Definition: BetaHilbert.hpp:1080
T zero_element
The element considered to be zero.
Definition: SparseMatrix.hpp:63
size_t bytes
Local memory use.
Definition: BetaHilbert.hpp:118
void wait()
Callee will Wait for end of SpMV.
Definition: BetaHilbert.hpp:310
std::vector< Triplet< T > > * original
Array of local sparse blocks.
Definition: BetaHilbert.hpp:106
virtual void zax_fb(const _t_value *__restrict__ x_p, _t_value *__restrict__ y_p)
Interleaved zax kernel for use with the BetaHilbert scheme.
Definition: FBICRS.hpp:617
pthread_cond_t * end_cond
Condition used for end sync.
Definition: BetaHilbert.hpp:130
size_t * sync
Counter used for synchronisation.
Definition: BetaHilbert.hpp:133
virtual T * mv(const T *x)
Overloaded mv call; allocates output vector using numa_interleaved.
Definition: BetaHilbert.hpp:1102
virtual unsigned long int n()
Queries the number of columns this matrix contains.
Definition: SparseMatrix.hpp:115
A single triplet value.
Definition: Triplet.hpp:52
size_t output_vector_size
Length of the local output vector.
Definition: BetaHilbert.hpp:139
T ** output
Array of all output vectors local to all SPMD processes.
Definition: BetaHilbert.hpp:151
BetaHilbert(const std::string file, T zero=0, std::vector< unsigned short int > *_p_translate=NULL)
File-based constructor.
Definition: BetaHilbert.hpp:267
virtual void load(std::vector< Triplet< T > > &input, const ULI m, const ULI n, const T zero)
Loads from input COO matrix.
Definition: BetaHilbert.hpp:324
BetaHilbert(std::vector< Triplet< T > > &input, ULI m, ULI n, T zero=0, std::vector< unsigned short int > *_p_translate=NULL)
COO-based constructor.
Definition: BetaHilbert.hpp:281
virtual void zax(const T *x, T *z)
Computes z=Ax in place.
Definition: BetaHilbert.hpp:1134
static void * thread(void *data)
SPMD code for each thread doing an SpMV.
Definition: BetaHilbert.hpp:437