SparseLibrary  Version 1.6.0
RDCSB.hpp
1 /*
2  * Copyright (c) 2007-2014, A. N. Yzelman, Utrecht University 2007-2011;
3  * KU Leuven 2011-2014.
4  * R. H. Bisseling, Utrecht University 2007-2014.
5  *
6  * This file is part of the Sparse Library.
7  *
8  * This library was developed under supervision of Prof. dr. Rob H. Bisseling at
9  * Utrecht University, from 2007 until 2011. From 2011-2014, development continued
10  * at KU Leuven, where Prof. dr. Dirk Roose contributed significantly to the ideas
11  * behind the newer parts of the library code.
12  *
13  * The Sparse Library is free software: you can redistribute it and/or modify
14  * it under the terms of the GNU General Public License as published by the
15  * Free Software Foundation, either version 3 of the License, or (at your
16  * option) any later version.
17  *
18  * The Sparse Library is distributed in the hope that it will be useful, but
19  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
20  * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21  * for more details.
22  *
23  * You should have received a copy of the GNU General Public License along
24  * with the Sparse Library. If not, see <http://www.gnu.org/licenses/>.
25  */
26 
27 
28 /*
29  * File created by:
30  * A. N. Yzelman, Dept. of Computer Science, KU Leuven, 2011.
31  */
32 
33 
34 #include <iostream>
35 #include <vector>
36 #include <map>
37 #include <pthread.h>
38 #include <numa.h>
39 
40 #include "SparseMatrix.hpp"
41 #include "Matrix2HilbertCoordinates.hpp"
42 
43 #include "csb/utility.h"
44 #include "csb/triple.h"
45 #include "csb/csc.h"
46 #include "csb/bicsb.h"
47 
48 #ifndef _H_RDCSB
49 #define _H_RDCSB
50 
51 /*
52  * When defined, thread 0 will use the global y vector for its local
53  * computations. This introduces extra work in the form of one sync,
54  * and the amount of available processors for the parallel collect.
55  * The advantage is less data replication of the output vector.
56  *
57  * The synchronisation cannot be prevented. Using all processors in
58  * the collect code is possible but has not been programmed currently.
59  */
60 //#define RDCSB_GLOBAL_Y
61 
62 /*
63  * When defined, RDCSB will not collect output results in the
64  * global output vector passed to this library. Used for timing
65  * the true SpMV speeds only.
66  */
67 #define RDCSB_NO_COLLECT
68 
70 template< typename T >
72 
73  public:
74 
75  unsigned long int id;
76 
77  unsigned long int P;
78 
80  unsigned char mode;
81 
83  unsigned long int repeat;
84 
85  std::vector< Triplet< T > > *original;
86 
88  unsigned long int *nzb;
89 
91  double time;
92 
93  pthread_mutex_t* mutex;
94 
95  pthread_cond_t* cond;
96 
97  pthread_mutex_t* end_mutex;
98 
99  pthread_cond_t* end_cond;
100 
101  unsigned long int *sync, *end_sync;
102 
103  unsigned long int output_vector_size;
104 
105  unsigned long int output_vector_offset;
106 
107  T *local_y;
108 
110  RDCSB_shared_data(): id( -1 ), P( -1 ), mode( 0 ), time( 0 ), repeat( 0 ), original( NULL ), nzb( NULL ),
111  mutex( NULL ), cond( NULL ), end_mutex( NULL ), end_cond( NULL ),
112  sync( NULL ), end_sync( NULL ),
113  output_vector_size( -1 ), output_vector_offset( -1 ) {}
114 
116  RDCSB_shared_data( unsigned long int _id, unsigned long int _P,
117  std::vector< Triplet< double > > *_original,
118  unsigned long int *_nzb,
119  pthread_mutex_t *_mutex, pthread_cond_t *_cond, pthread_mutex_t *_end_mutex, pthread_cond_t *_end_cond,
120  unsigned long int *_sync, unsigned long int *_end_sync,
121  unsigned long int _ovsize, unsigned long int _ovoffset ):
122  id( _id ), P( _P ), mode( 1 ), time( 0 ), repeat( 1 ), original( _original ), nzb( _nzb ),
123  mutex( _mutex ), cond( _cond ), end_mutex( _end_mutex ), end_cond( _end_cond ),
124  sync( _sync ), end_sync( _end_sync ),
125  output_vector_size( _ovsize ), output_vector_offset( _ovoffset ), local_y( NULL ) {}
126 };
127 
131 template< typename T >
132 class RDCSB: public SparseMatrix< T, ULI > {
133 
134  private:
135 
136  protected:
137 
139  static unsigned long int P;
140 
142  static const T* input;
143 
145  static T* output;
146 
148  pthread_t *threads;
149 
152 
154  static clockid_t global_clock_id;
155 
157  pthread_mutex_t mutex;
158 
160  pthread_cond_t cond;
161 
163  pthread_mutex_t end_mutex;
164 
166  pthread_cond_t end_cond;
167 
169  unsigned long int sync;
170 
172  unsigned long int end_sync;
173 
174  public:
175 
176  RDCSB( const std::string file, T zero ) {
177  loadFromFile( file, zero );
178  }
179 
180  RDCSB( std::vector< Triplet< T > >& input, ULI m, ULI n, T zero ) {
181  load( input, m, n, zero );
182  }
183 
184  virtual ~RDCSB() {
185  //set all daemon threads to exit mode
186  for( unsigned long int i=0; i<P; i++ )
187  thread_data[ i ].mode = 4;
188 
189  //wake up all daemon threads
190  pthread_mutex_lock( &mutex );
191  pthread_cond_broadcast( &cond );
192  pthread_mutex_unlock( &mutex );
193 
194  //allow threads to exit gracefully
195  for( unsigned long int i=0; i<P; i++ )
196  pthread_join( threads[ i ], NULL );
197 
198  //destroy data
199  delete [] thread_data;
200  delete [] threads;
201  pthread_mutex_destroy( &mutex );
202  pthread_cond_destroy( &cond );
203  }
204 
205  void wait() {
206  //wait for end signal
207  pthread_cond_wait( &end_cond, &end_mutex );
208  pthread_mutex_unlock( &end_mutex );
209  }
210 
211  virtual void load( std::vector< Triplet< T > >& input, const ULI m, const ULI n, const T zero ) {
212  //get number of cores available
214 
215  //set kernel to local thread allocation if it wasn't already the case
216  numa_set_localalloc();
217 
218  //base settings
219  this->zero_element = zero;
220  this->nor = m;
221  this->noc = n;
222  this->nnz = input.size();
223 
224  unsigned long int *nzb = new unsigned long int [ this->m() ];
225  for( unsigned long int i=0; i<m; i++ ) nzb[ i ] = 0;
226 
227  //create P threads :)
228  this->threads = new pthread_t[ P ];
229  //initialize local initialisation data
231  //initialize mutexes and conditions and a synchronisation counter
232  pthread_mutex_init( &mutex, NULL );
233  pthread_cond_init ( &cond, NULL );
234  pthread_mutex_init( &end_mutex, NULL );
235  pthread_cond_init ( &end_cond, NULL );
236  sync = 0;
237  end_sync = 0;
238  //lock end mutex (disallow threads that are created to signal for end
239  //before this thread is done with spawning children)
240  pthread_mutex_lock( &end_mutex );
241  //go forth and multiply
242  for( unsigned long int i=0; i<P; i++ ) {
243  //build thread-local init data
244  thread_data[ i ] = RDCSB_shared_data<T>( i, P, &input, nzb, &mutex, &cond, &end_mutex, &end_cond, &sync, &end_sync, -1, -1 );
245  //set fixed affinity for threads
246  cpu_set_t mask;
247  CPU_ZERO( &mask );
248  CPU_SET ( i, &mask );
249 
250  //TODO: use hwloc for better numa-aware pinning
251  /*hwloc_topology_t topology;
252  hwloc_topology_init ( &topology );
253  hwloc_topology_load( topology );
254  hwloc_bitmap_t cpuset;*/
255 
256  //prepare attributes
257  pthread_attr_t attr;
258  pthread_attr_init( &attr );
259  //set fixed affinity in attribute, so that it starts binded immediately
260  pthread_attr_setaffinity_np( &attr, sizeof( cpu_set_t ), &mask );
261  //fire up thread
262  pthread_create( &threads[i], &attr, &RDCSB::thread, (void*) &thread_data[i] );
263  //free attr
264  pthread_attr_destroy( &attr );
265  }
266 
267  //wait for threads to finish initialisation
268  wait();
269 
270  //delete temporary array
271  delete [] nzb;
272  }
273 
274  static void end( pthread_mutex_t* mutex, pthread_cond_t* cond, unsigned long int *sync, const unsigned long int P ) {
275  pthread_mutex_lock( mutex );
276  (*sync)++;
277  if( *sync == P ) {
278  pthread_cond_signal( cond );
279  *sync = 0;
280  }
281  pthread_mutex_unlock( mutex );
282  }
283 
284  static void synchronise( pthread_mutex_t* mutex, pthread_cond_t* cond, unsigned long int *sync, const unsigned long int P ) {
285  pthread_mutex_lock( mutex );
286  (*sync)++;
287  if( *sync == P ) {
288  *sync = 0;
289  pthread_cond_broadcast( cond );
290  } else
291  pthread_cond_wait( cond, mutex );
292  pthread_mutex_unlock( mutex );
293  }
294 
295  static void* thread( void *data ) {
296  //get short-hand notation
298  const unsigned long int id = shared->id;
299  const unsigned long int P = shared->P;
300  const unsigned long int nnz = shared->original->size();
301  pthread_mutex_t *mutex = shared->mutex;
302  pthread_cond_t *cond = shared->cond;
303 
304  cpu_set_t mask;
305  CPU_ZERO( &mask );
306  pthread_getaffinity_np( pthread_self(), sizeof( cpu_set_t ), &mask );
307  if( !CPU_ISSET( id, &mask ) ) {
308  std::cerr << "Incorrect pinning for thread " << id << "!" << std::endl;
309  exit( 1 );
310  }
311  for( unsigned long int s=0; s<P; s++ ) {
312  if( s==id ) continue;
313  if( CPU_ISSET( s, &mask ) ) {
314  std::cerr << "Thread " << id << " mask is larger than one core" << " (" << s << " is set)!" << std::endl;
315  exit( 1 );
316  }
317  }
318 
319  //prepare to get global matrix dimensions
320  ULI m, n;
321  m = n = 0;
322  //put rowsums in nzb
323  const unsigned long int blocksize = (nnz % P) > 0 ? nnz / P + 1 : nnz / P;
324  for( unsigned long int i=0; i<nnz; i++ ) {
325  const unsigned long int currow = (*(shared->original))[ i ].i();
326  const unsigned long int curcol = (*(shared->original))[ i ].j();
327  if( currow >= id * blocksize && currow < (id + 1) * blocksize )
328  shared->nzb[ currow ]++;
329  if( currow > m ) m = currow;
330  if( curcol > n ) n = curcol;
331  }
332 
333  //dimensions are one higher than max indices
334  m++;
335  n++;
336 
337  //sync
338  RDCSB::synchronise( mutex, cond, shared->sync, shared->P );
339 
340  //determine distribution
341  const unsigned long int nnz_target = nnz / P;
342  unsigned long int cursum = 0;
343 
344  //first sanity check
345  for( unsigned long int i=0; i<m; i++ ) cursum += shared->nzb[ i ];
346  assert( cursum == nnz );
347 
348  //continue
349  cursum = 0;
350  unsigned long int start, end, k = 0;
351  start = end = -1;
352  if( id == 0 ) start = 0;
353  for( unsigned long int i=0; i<m; i++ ) {
354  cursum += shared->nzb[ i ];
355  if( cursum >= nnz_target ) {
356  if( k == id ) end = i + 1;
357  if(k+1== id ) start = i + 1;
358  k++;
359  cursum = 0;
360  }
361  }
362  assert( k == P-1 );
363  if( id == P-1 ) end = m;
364  shared->output_vector_size = end - start;
365  shared->output_vector_offset = start;
366  assert (shared->output_vector_size <= m );
367  assert( shared->output_vector_offset + shared->output_vector_size <= m );
368 
369  //copy to local first
370  std::vector< Triplet< T > > local;
371  for( unsigned long int i=0; i<nnz; i++ ) {
372  const unsigned long int currow = (*(shared->original))[ i ].i();
373  if( currow >= start && currow < end )
374  local.push_back(
375  Triplet< T >( (*(shared->original))[ i ].i() - start,
376  (*(shared->original))[ i ].j(),
377  (*(shared->original))[ i ].value )
378  );
379  }
380  const unsigned long int local_nnz = local.size();
381  m = shared->output_vector_size; //new matrix size is new m times old n
382 
383  //transform to CSB-compatible format
384  unsigned int *row = new unsigned int[ local_nnz ];
385  unsigned int *col = new unsigned int[ local_nnz ];
386  double *val = new double[ local_nnz ];
387  for( unsigned long int i=0; i<local_nnz; i++ ) {
388  row[ i ] = local[ i ].i();
389  col[ i ] = local[ i ].j();
390  val[ i ] = local[ i ].value;
391  }
392  local.clear();
393 
394  //load into CSB
395  BiCsb< T, unsigned > dss( local_nnz, m, n, row, col, val, 0, 0 );
396 
397  //delete transform arrays
398  delete [] row;
399  delete [] col;
400  delete [] val;
401 
402  //create local shadow of y to avoid write-contention
403  T* y = NULL;
404 #ifdef RDCSB_GLOBAL_Y
405  if( id > 0 ) {
406 #endif
407  y = new T[ shared->output_vector_size ];
408  for( unsigned long int i=0; i<shared->output_vector_size; i++ )
409  y[ i ] = 0.0;
410 #ifdef RDCSB_GLOBAL_Y
411  }
412 #endif
413  shared->local_y = y;
414 
415  //exit construction mode
416  shared->mode = 0;
417 
418  //signal end of construction
419  pthread_mutex_lock( mutex );
420  RDCSB::end( shared->end_mutex, shared->end_cond, shared->end_sync, shared->P );
421 
422  //enter daemon mode
423  while( true ) {
424  struct timespec clk_start, clk_stop;
425  pthread_cond_wait( cond, mutex );
426  pthread_mutex_unlock( mutex );
427 
428  if( shared->mode == 4 ) break;
429 
430  switch( shared->mode ) {
431  case 3:
432  assert( RDCSB<T>::input != NULL );
433  assert( RDCSB<T>::output != NULL );
434 #ifdef RDCSB_GLOBAL_Y
435  if( id == 0 ) {
436  y = RDCSB::output;
437  shared->local_y = y;
438  }
439 #endif
440  assert( y != NULL );
441  std::cerr << "CSB interface to z=x*A has not been implemented!" << std::endl;
442  exit( 1 );
443 #ifndef RDCSB_NO_COLLECT
444  collectY( shared );
445 #endif
446  break;
447  case 2:
448  assert( RDCSB<T>::input != NULL );
449  assert( RDCSB<T>::output != NULL );
450 #ifdef RDCSB_GLOBAL_Y
451  if( id == 0 ) {
452  y = RDCSB::output;
453  shared->local_y = y;
454  }
455 #endif
456  assert( y != NULL );
457 
458  clock_gettime( global_clock_id, &clk_start);
459  shared->time = 0.0;
460  for( unsigned long int i=0; i<shared->repeat; ++i )
461  bicsb_gaxpy( dss, RDCSB<T>::input, y );
462  clock_gettime( global_clock_id, &clk_stop);
463  shared->time = (clk_stop.tv_sec-clk_start.tv_sec)*1000;
464  shared->time += (clk_stop.tv_nsec-clk_start.tv_nsec)/1000000.0;
465 
466 #ifndef RDCSB_NO_COLLECT
467  collectY( shared );
468 #endif
469  break;
470  default:
471  std::cout << "Thread " << id << ": Error, undefined operation (" << shared->mode << ")!" << std::endl;
472  exit( -1 );
473  }
474  shared->mode = 0;
475 
476  //signal end of operation
477  pthread_mutex_lock( mutex );
478  RDCSB::end( shared->end_mutex, shared->end_cond, shared->sync, shared->P );
479  }
480 
481  //done
482 #ifdef RDCSB_GLOBAL_Y
483  if( id != 0 )
484 #endif
485  delete [] y;
486  return (NULL);
487  }
488 
489  static void collectY( RDCSB_shared_data<T> *shared ) {
490 
491 #ifdef RDCSB_GLOBAL_Y
492  //FIXME It could be possible to distribute work over all processors
493  //instead of p-1 processors, but this requires some extra balancing.
494  const unsigned long int s = shared->id;
495  if( s == 0 ) return;
496 #endif
497 
498  //do collect items of own block
499  for( unsigned long int i = 0; i < shared->output_vector_size; i++ ) {
500  assert( RDCSB<T>::output != NULL );
501  assert( shared->local_y != NULL );
502  RDCSB<T>::output[ shared->output_vector_offset + i ] += shared->local_y[ i ];
503  }
504  }
505 
506 #ifndef _NO_LIBNUMA
507 
508  virtual T* mv( const T* x ) {
509  T* ret = (T*) numa_alloc_interleaved( this->nor * sizeof( T ) );
510  for( ULI i=0; i<this->nor; i++ ) ret[ i ] = this->zero_element;
511  zax( x, ret );
512  return ret;
513  }
514 #endif
515 
516  virtual void zxa( const T* x, T* z ) {
517  zxa( x, z, 1 );
518  }
519 
520  virtual void zxa( const T* x, T* z, const unsigned long int repeat ) {
521  //set all daemon threads to do zxa
522  for( unsigned long int i=0; i<P; i++ ) {
523  thread_data[ i ].mode = 3;
524  thread_data[ i ].repeat = repeat;
525  }
526 
527  //set input vector
528  RDCSB<T>::input = x;
529 
530  //set output vector
531  RDCSB<T>::output = z;
532 
533  //wake up all daemon threads
534  pthread_mutex_lock( &end_mutex );
535  pthread_mutex_lock( &mutex );
536  pthread_cond_broadcast( &cond );
537  pthread_mutex_unlock( &mutex );
538 
539  //wait for end of operation
540  wait();
541 
542  //unset vectors
543  RDCSB<T>::input = NULL;
544  RDCSB<T>::output = NULL;
545  }
546 
547  virtual void zax( const T* x, T* z ) {
548  zax( x, z, 1, 0, NULL );
549  }
550 
551  virtual void zax( const T* x, T* z, const unsigned long int repeat, const clockid_t clock_id, double *elapsed_time ) {
552  //set all daemon threads to do zax
553  for( unsigned long int i=0; i<P; i++ ) {
554  thread_data[ i ].mode = 2;
555  thread_data[ i ].repeat = repeat;
556  }
557 
558  //set global clock ID
559  global_clock_id = clock_id;
560 
561  //set input vector
562  RDCSB<T>::input = x;
563 
564  //set output vector
565  RDCSB<T>::output = z;
566 
567  //wake up all daemon threads
568  pthread_mutex_lock( &end_mutex );
569  pthread_mutex_lock( &mutex );
570  pthread_cond_broadcast( &cond );
571  pthread_mutex_unlock( &mutex );
572 
573  //wait for end of operation
574  wait();
575 
576  //get elapsed time
577  double maxtime = 0.0;
578  for( unsigned long int i=0; i<P; i++ ) {
579  const double curtime = thread_data[ i ].time;
580  if( curtime > maxtime ) maxtime = curtime;
581  }
582  if( elapsed_time != NULL )
583  *elapsed_time += maxtime;
584 
585  //unset vectors
586  RDCSB<T>::input = NULL;
587  RDCSB<T>::output = NULL;
588  }
589 
590  virtual void getFirstIndexPair( ULI &i, ULI &j ) {
591  std::cerr << "Warning: RDCSB::getFirstIndexPair has no unique answer since it implements a parallel multiplication!\nIgnoring call..." << std::endl;
592  }
593 };
594 
595 template< typename T > unsigned long int RDCSB< T >::P = 0;
596 
597 template< typename T > const T* RDCSB< T >::input = NULL;
598 
599 template< typename T > T* RDCSB< T >::output = NULL;
600 
601 template< typename T > clockid_t RDCSB< T >::global_clock_id = 0;
602 
603 #endif
604 
ULI nnz
Number of non-zeros.
Definition: SparseMatrix.hpp:58
static T * output
Output vector.
Definition: RDCSB.hpp:145
virtual void getFirstIndexPair(ULI &i, ULI &j)
Returns the first nonzero index, per reference.
Definition: RDCSB.hpp:590
unsigned long int * nzb
Will store rowsums.
Definition: RDCSB.hpp:88
Shared data for RDCSB threads.
Definition: RDCSB.hpp:71
unsigned long int sync
Used for synchronising threads.
Definition: RDCSB.hpp:169
static unsigned long int P
Number of threads to fire up.
Definition: RDCSB.hpp:139
RDCSB_shared_data< T > * thread_data
array of initial thread data
Definition: RDCSB.hpp:151
static clockid_t global_clock_id
Clock type used for thread-local timing.
Definition: RDCSB.hpp:154
unsigned long int cores() const
The number of available cores.
Definition: MachineInfo.cpp:77
double time
Will store local timing.
Definition: RDCSB.hpp:91
virtual unsigned long int m()
Queries the number of rows this matrix contains.
Definition: SparseMatrix.hpp:107
unsigned char mode
0 undef, 1 init, 2 zax, 3 zxa, 4 exit
Definition: RDCSB.hpp:80
unsigned long int end_sync
Used for construction end signal.
Definition: RDCSB.hpp:172
static const MachineInfo & getInstance()
Gets a singleton instance.
Definition: MachineInfo.cpp:38
RDCSB_shared_data(unsigned long int _id, unsigned long int _P, std::vector< Triplet< double > > *_original, unsigned long int *_nzb, pthread_mutex_t *_mutex, pthread_cond_t *_cond, pthread_mutex_t *_end_mutex, pthread_cond_t *_end_cond, unsigned long int *_sync, unsigned long int *_end_sync, unsigned long int _ovsize, unsigned long int _ovoffset)
Recommended constructor.
Definition: RDCSB.hpp:116
RDCSB_shared_data()
Base constructor.
Definition: RDCSB.hpp:110
pthread_t * threads
p_threads associated to this data strcuture
Definition: RDCSB.hpp:148
void loadFromFile(const std::string file, const T zero=0)
Function which loads a matrix from a matrix market file.
Definition: SparseMatrix.hpp:89
Interface common to all sparse matrix storage schemes.
Definition: SparseMatrix.hpp:46
ULI noc
Number of columns.
Definition: SparseMatrix.hpp:55
static const T * input
Input vector.
Definition: RDCSB.hpp:142
virtual T * mv(const T *x)
Overloaded mv call; allocates output vector using numa_interleaved.
Definition: RDCSB.hpp:508
pthread_cond_t cond
Stop/continue mechanism: condition.
Definition: RDCSB.hpp:160
ULI nor
Number of rows.
Definition: SparseMatrix.hpp:52
Full parallel row-distributed SpMV, based on CSB (BlockCRS + Morton curve + Cilk) and PThreads...
Definition: RDCSB.hpp:132
unsigned long int repeat
how many times to repeat the operation set in `mode' (above, only for 2 and 3)
Definition: RDCSB.hpp:83
T zero_element
The element considered to be zero.
Definition: SparseMatrix.hpp:63
virtual void load(std::vector< Triplet< T > > &input, const ULI m, const ULI n, const T zero)
Function reading in from std::vector< Triplet< T > > format.
Definition: RDCSB.hpp:211
pthread_mutex_t mutex
Stop/continue mechanism: mutex.
Definition: RDCSB.hpp:157
pthread_cond_t end_cond
Wait for end mechanism: condition.
Definition: RDCSB.hpp:166
virtual unsigned long int n()
Queries the number of columns this matrix contains.
Definition: SparseMatrix.hpp:115
A single triplet value.
Definition: Triplet.hpp:52
pthread_mutex_t end_mutex
Wait for end mechanism: mutex.
Definition: RDCSB.hpp:163