40 #include "SparseMatrix.hpp"
41 #include "Matrix2HilbertCoordinates.hpp"
43 #include "csb/utility.h"
44 #include "csb/triple.h"
46 #include "csb/bicsb.h"
67 #define RDCSB_NO_COLLECT
70 template<
typename T >
85 std::vector< Triplet< T > > *original;
88 unsigned long int *
nzb;
93 pthread_mutex_t* mutex;
97 pthread_mutex_t* end_mutex;
99 pthread_cond_t* end_cond;
101 unsigned long int *sync, *end_sync;
103 unsigned long int output_vector_size;
105 unsigned long int output_vector_offset;
111 mutex( NULL ), cond( NULL ), end_mutex( NULL ), end_cond( NULL ),
112 sync( NULL ), end_sync( NULL ),
113 output_vector_size( -1 ), output_vector_offset( -1 ) {}
118 unsigned long int *_nzb,
119 pthread_mutex_t *_mutex, pthread_cond_t *_cond, pthread_mutex_t *_end_mutex, pthread_cond_t *_end_cond,
120 unsigned long int *_sync,
unsigned long int *_end_sync,
121 unsigned long int _ovsize,
unsigned long int _ovoffset ):
122 id( _id ), P( _P ),
mode( 1 ),
time( 0 ),
repeat( 1 ), original( _original ),
nzb( _nzb ),
123 mutex( _mutex ), cond( _cond ), end_mutex( _end_mutex ), end_cond( _end_cond ),
124 sync( _sync ), end_sync( _end_sync ),
125 output_vector_size( _ovsize ), output_vector_offset( _ovoffset ), local_y( NULL ) {}
131 template<
typename T >
139 static unsigned long int P;
176 RDCSB(
const std::string file, T zero ) {
186 for(
unsigned long int i=0; i<
P; i++ )
190 pthread_mutex_lock( &
mutex );
191 pthread_cond_broadcast( &
cond );
192 pthread_mutex_unlock( &
mutex );
195 for(
unsigned long int i=0; i<
P; i++ )
196 pthread_join(
threads[ i ], NULL );
201 pthread_mutex_destroy( &
mutex );
202 pthread_cond_destroy( &
cond );
216 numa_set_localalloc();
224 unsigned long int *nzb =
new unsigned long int [ this->
m() ];
225 for(
unsigned long int i=0; i<
m; i++ ) nzb[ i ] = 0;
232 pthread_mutex_init( &
mutex, NULL );
233 pthread_cond_init ( &
cond, NULL );
235 pthread_cond_init ( &
end_cond, NULL );
242 for(
unsigned long int i=0; i<
P; i++ ) {
244 thread_data[ i ] =
RDCSB_shared_data<T>( i,
P, &
input, nzb, &
mutex, &
cond, &
end_mutex, &
end_cond, &
sync, &
end_sync, -1, -1 );
248 CPU_SET ( i, &mask );
258 pthread_attr_init( &attr );
260 pthread_attr_setaffinity_np( &attr,
sizeof( cpu_set_t ), &mask );
264 pthread_attr_destroy( &attr );
274 static void end( pthread_mutex_t*
mutex, pthread_cond_t*
cond,
unsigned long int *
sync,
const unsigned long int P ) {
275 pthread_mutex_lock( mutex );
278 pthread_cond_signal( cond );
281 pthread_mutex_unlock( mutex );
284 static void synchronise( pthread_mutex_t* mutex, pthread_cond_t* cond,
unsigned long int *sync,
const unsigned long int P ) {
285 pthread_mutex_lock( mutex );
289 pthread_cond_broadcast( cond );
291 pthread_cond_wait( cond, mutex );
292 pthread_mutex_unlock( mutex );
295 static void* thread(
void *data ) {
298 const unsigned long int id = shared->id;
299 const unsigned long int P = shared->P;
300 const unsigned long int nnz = shared->original->size();
301 pthread_mutex_t *mutex = shared->mutex;
302 pthread_cond_t *cond = shared->cond;
306 pthread_getaffinity_np( pthread_self(),
sizeof( cpu_set_t ), &mask );
307 if( !CPU_ISSET(
id, &mask ) ) {
308 std::cerr <<
"Incorrect pinning for thread " <<
id <<
"!" << std::endl;
311 for(
unsigned long int s=0; s<
P; s++ ) {
312 if( s==
id )
continue;
313 if( CPU_ISSET( s, &mask ) ) {
314 std::cerr <<
"Thread " <<
id <<
" mask is larger than one core" <<
" (" << s <<
" is set)!" << std::endl;
323 const unsigned long int blocksize = (nnz %
P) > 0 ? nnz / P + 1 : nnz / P;
324 for(
unsigned long int i=0; i<
nnz; i++ ) {
325 const unsigned long int currow = (*(shared->original))[ i ].i();
326 const unsigned long int curcol = (*(shared->original))[ i ].j();
327 if( currow >=
id * blocksize && currow < (
id + 1) * blocksize )
328 shared->
nzb[ currow ]++;
329 if( currow > m ) m = currow;
330 if( curcol > n ) n = curcol;
338 RDCSB::synchronise( mutex, cond, shared->sync, shared->P );
341 const unsigned long int nnz_target = nnz /
P;
342 unsigned long int cursum = 0;
345 for(
unsigned long int i=0; i<
m; i++ ) cursum += shared->
nzb[ i ];
346 assert( cursum == nnz );
350 unsigned long int start, end, k = 0;
352 if(
id == 0 ) start = 0;
353 for(
unsigned long int i=0; i<
m; i++ ) {
354 cursum += shared->
nzb[ i ];
355 if( cursum >= nnz_target ) {
356 if( k ==
id ) end = i + 1;
357 if(k+1==
id ) start = i + 1;
363 if(
id == P-1 ) end =
m;
364 shared->output_vector_size = end - start;
365 shared->output_vector_offset = start;
366 assert (shared->output_vector_size <= m );
367 assert( shared->output_vector_offset + shared->output_vector_size <= m );
370 std::vector< Triplet< T > > local;
371 for(
unsigned long int i=0; i<
nnz; i++ ) {
372 const unsigned long int currow = (*(shared->original))[ i ].i();
373 if( currow >= start && currow < end )
376 (*(shared->original))[ i ].j(),
377 (*(shared->original))[ i ].value )
380 const unsigned long int local_nnz = local.size();
381 m = shared->output_vector_size;
384 unsigned int *row =
new unsigned int[ local_nnz ];
385 unsigned int *col =
new unsigned int[ local_nnz ];
386 double *val =
new double[ local_nnz ];
387 for(
unsigned long int i=0; i<local_nnz; i++ ) {
388 row[ i ] = local[ i ].i();
389 col[ i ] = local[ i ].j();
390 val[ i ] = local[ i ].value;
395 BiCsb< T, unsigned > dss( local_nnz, m, n, row, col, val, 0, 0 );
404 #ifdef RDCSB_GLOBAL_Y
407 y =
new T[ shared->output_vector_size ];
408 for(
unsigned long int i=0; i<shared->output_vector_size; i++ )
410 #ifdef RDCSB_GLOBAL_Y
419 pthread_mutex_lock( mutex );
420 RDCSB::end( shared->end_mutex, shared->end_cond, shared->end_sync, shared->P );
424 struct timespec clk_start, clk_stop;
425 pthread_cond_wait( cond, mutex );
426 pthread_mutex_unlock( mutex );
428 if( shared->
mode == 4 )
break;
430 switch( shared->
mode ) {
434 #ifdef RDCSB_GLOBAL_Y
441 std::cerr <<
"CSB interface to z=x*A has not been implemented!" << std::endl;
443 #ifndef RDCSB_NO_COLLECT
450 #ifdef RDCSB_GLOBAL_Y
460 for(
unsigned long int i=0; i<shared->
repeat; ++i )
463 shared->
time = (clk_stop.tv_sec-clk_start.tv_sec)*1000;
464 shared->
time += (clk_stop.tv_nsec-clk_start.tv_nsec)/1000000.0;
466 #ifndef RDCSB_NO_COLLECT
471 std::cout <<
"Thread " <<
id <<
": Error, undefined operation (" << shared->
mode <<
")!" << std::endl;
477 pthread_mutex_lock( mutex );
478 RDCSB::end( shared->end_mutex, shared->end_cond, shared->sync, shared->P );
482 #ifdef RDCSB_GLOBAL_Y
491 #ifdef RDCSB_GLOBAL_Y
494 const unsigned long int s = shared->id;
499 for(
unsigned long int i = 0; i < shared->output_vector_size; i++ ) {
501 assert( shared->local_y != NULL );
502 RDCSB<T>::output[ shared->output_vector_offset + i ] += shared->local_y[ i ];
508 virtual T*
mv(
const T* x ) {
509 T* ret = (T*) numa_alloc_interleaved( this->
nor *
sizeof( T ) );
516 virtual void zxa(
const T* x, T* z ) {
520 virtual void zxa(
const T* x, T* z,
const unsigned long int repeat ) {
522 for(
unsigned long int i=0; i<
P; i++ ) {
535 pthread_mutex_lock( &mutex );
536 pthread_cond_broadcast( &cond );
537 pthread_mutex_unlock( &mutex );
547 virtual void zax(
const T* x, T* z ) {
548 zax( x, z, 1, 0, NULL );
551 virtual void zax(
const T* x, T* z,
const unsigned long int repeat,
const clockid_t clock_id,
double *elapsed_time ) {
553 for(
unsigned long int i=0; i<
P; i++ ) {
569 pthread_mutex_lock( &mutex );
570 pthread_cond_broadcast( &cond );
571 pthread_mutex_unlock( &mutex );
577 double maxtime = 0.0;
578 for(
unsigned long int i=0; i<
P; i++ ) {
580 if( curtime > maxtime ) maxtime = curtime;
582 if( elapsed_time != NULL )
583 *elapsed_time += maxtime;
591 std::cerr <<
"Warning: RDCSB::getFirstIndexPair has no unique answer since it implements a parallel multiplication!\nIgnoring call..." << std::endl;
ULI nnz
Number of non-zeros.
Definition: SparseMatrix.hpp:58
static T * output
Output vector.
Definition: RDCSB.hpp:145
virtual void getFirstIndexPair(ULI &i, ULI &j)
Returns the first nonzero index, per reference.
Definition: RDCSB.hpp:590
unsigned long int * nzb
Will store rowsums.
Definition: RDCSB.hpp:88
Shared data for RDCSB threads.
Definition: RDCSB.hpp:71
unsigned long int sync
Used for synchronising threads.
Definition: RDCSB.hpp:169
static unsigned long int P
Number of threads to fire up.
Definition: RDCSB.hpp:139
RDCSB_shared_data< T > * thread_data
array of initial thread data
Definition: RDCSB.hpp:151
static clockid_t global_clock_id
Clock type used for thread-local timing.
Definition: RDCSB.hpp:154
unsigned long int cores() const
The number of available cores.
Definition: MachineInfo.cpp:77
double time
Will store local timing.
Definition: RDCSB.hpp:91
virtual unsigned long int m()
Queries the number of rows this matrix contains.
Definition: SparseMatrix.hpp:107
unsigned char mode
0 undef, 1 init, 2 zax, 3 zxa, 4 exit
Definition: RDCSB.hpp:80
unsigned long int end_sync
Used for construction end signal.
Definition: RDCSB.hpp:172
static const MachineInfo & getInstance()
Gets a singleton instance.
Definition: MachineInfo.cpp:38
RDCSB_shared_data(unsigned long int _id, unsigned long int _P, std::vector< Triplet< double > > *_original, unsigned long int *_nzb, pthread_mutex_t *_mutex, pthread_cond_t *_cond, pthread_mutex_t *_end_mutex, pthread_cond_t *_end_cond, unsigned long int *_sync, unsigned long int *_end_sync, unsigned long int _ovsize, unsigned long int _ovoffset)
Recommended constructor.
Definition: RDCSB.hpp:116
RDCSB_shared_data()
Base constructor.
Definition: RDCSB.hpp:110
pthread_t * threads
p_threads associated to this data strcuture
Definition: RDCSB.hpp:148
void loadFromFile(const std::string file, const T zero=0)
Function which loads a matrix from a matrix market file.
Definition: SparseMatrix.hpp:89
Interface common to all sparse matrix storage schemes.
Definition: SparseMatrix.hpp:46
ULI noc
Number of columns.
Definition: SparseMatrix.hpp:55
static const T * input
Input vector.
Definition: RDCSB.hpp:142
virtual T * mv(const T *x)
Overloaded mv call; allocates output vector using numa_interleaved.
Definition: RDCSB.hpp:508
pthread_cond_t cond
Stop/continue mechanism: condition.
Definition: RDCSB.hpp:160
ULI nor
Number of rows.
Definition: SparseMatrix.hpp:52
Full parallel row-distributed SpMV, based on CSB (BlockCRS + Morton curve + Cilk) and PThreads...
Definition: RDCSB.hpp:132
unsigned long int repeat
how many times to repeat the operation set in `mode' (above, only for 2 and 3)
Definition: RDCSB.hpp:83
T zero_element
The element considered to be zero.
Definition: SparseMatrix.hpp:63
virtual void load(std::vector< Triplet< T > > &input, const ULI m, const ULI n, const T zero)
Function reading in from std::vector< Triplet< T > > format.
Definition: RDCSB.hpp:211
pthread_mutex_t mutex
Stop/continue mechanism: mutex.
Definition: RDCSB.hpp:157
pthread_cond_t end_cond
Wait for end mechanism: condition.
Definition: RDCSB.hpp:166
virtual unsigned long int n()
Queries the number of columns this matrix contains.
Definition: SparseMatrix.hpp:115
A single triplet value.
Definition: Triplet.hpp:52
pthread_mutex_t end_mutex
Wait for end mechanism: mutex.
Definition: RDCSB.hpp:163