43 #include "SparseMatrix.hpp"
44 #include "Hilbert.hpp"
66 #define RDScheme_NO_COLLECT
70 template<
typename T >
151 pthread_mutex_t *_mutex, pthread_cond_t *_cond, pthread_mutex_t *_end_mutex, pthread_cond_t *_end_cond,
152 size_t *_sync,
size_t *_end_sync,
153 size_t _ovsize,
size_t _ovoffset ):
162 template<
typename T,
typename DS >
220 for(
size_t i=0; i<
P; i++ )
224 pthread_mutex_lock( &
mutex );
225 pthread_cond_broadcast( &
cond );
226 pthread_mutex_unlock( &
mutex );
229 for(
size_t i=0; i<
P; i++ )
230 pthread_join(
threads[ i ], NULL );
235 pthread_mutex_destroy( &
mutex );
236 pthread_cond_destroy( &
cond );
262 numa_set_localalloc();
271 size_t *nzb =
new size_t [ this->
m() ];
272 for(
size_t i=0; i<
m; i++ ) nzb[ i ] = 0;
279 pthread_mutex_init( &
mutex, NULL );
280 pthread_cond_init ( &
cond, NULL );
282 pthread_cond_init ( &
end_cond, NULL );
289 for(
size_t i=0; i<
P; i++ ) {
291 thread_data[ i ] =
RDScheme_shared_data<T>( i,
P, &
input, nzb, &
mutex, &
cond, &
end_mutex, &
end_cond, &
sync, &
end_sync, -1, -1 );
295 CPU_SET ( i, &mask );
305 pthread_attr_init( &attr );
307 pthread_attr_setaffinity_np( &attr,
sizeof( cpu_set_t ), &mask );
311 pthread_attr_destroy( &attr );
323 pthread_mutex_lock( mutex );
327 pthread_cond_signal( cond );
330 pthread_mutex_unlock( mutex );
335 pthread_mutex_lock( mutex );
339 pthread_cond_broadcast( cond );
341 pthread_cond_wait( cond, mutex );
342 pthread_mutex_unlock( mutex );
349 const size_t id = shared->
id;
350 const size_t P = shared->
P;
353 pthread_cond_t *
cond = shared->
cond;
357 pthread_getaffinity_np( pthread_self(),
sizeof( cpu_set_t ), &mask );
360 if( !CPU_ISSET(
id, &mask ) ) {
361 std::cerr <<
"Incorrect pinning for thread " <<
id <<
"!" << std::endl;
364 for(
size_t s=0; s<
P; s++ ) {
365 if( s==
id )
continue;
366 if( CPU_ISSET( s, &mask ) ) {
367 std::cerr <<
"Thread " <<
id <<
" mask is larger than one core" <<
" (" << s <<
" is set)!" << std::endl;
376 const size_t blocksize = (nnz %
P) > 0 ? nnz / P + 1 : nnz / P;
377 for(
size_t i=0; i<
nnz; i++ ) {
378 const unsigned long int currow = (*(shared->
original))[ i ].i();
379 const unsigned long int curcol = (*(shared->
original))[ i ].j();
380 if( currow >=
id * blocksize && currow < (
id + 1) * blocksize )
381 shared->
nzb[ currow ]++;
382 if( currow > m ) m = currow;
383 if( curcol > n ) n = curcol;
394 const size_t nnz_target = nnz /
P;
398 for(
unsigned long int i=0; i<
m; i++ ) cursum += shared->
nzb[ i ];
399 assert( cursum == nnz );
403 size_t start,
end, k = 0;
406 if(
id == 0 ) start = 0;
408 for(
size_t i = 0; i <
m; i++ ) {
409 cursum += shared->
nzb[ i ];
410 if( cursum >= nnz_target ) {
411 if( k ==
id ) end = i + 1;
412 if(k+1==
id ) start = i + 1;
418 if( start == static_cast< size_t >(-1) ) start =
m;
419 if( end == static_cast< size_t >(-1) ) end =
m;
421 if(
id == P-1 ) end =
m;
429 std::vector< Triplet< T > > local;
430 for(
size_t i = 0; i < static_cast< size_t >(
nnz); i++ ) {
431 const size_t currow = (*(shared->
original))[ i ].i();
432 if( currow >= start && currow < end )
442 DS dss( local, m, n, 0 );
445 shared->
bytes = dss.bytesUsed();
449 #ifdef RDScheme_GLOBAL_Y
458 #ifdef RDScheme_GLOBAL_Y
467 pthread_mutex_lock( mutex );
472 struct timespec clk_start, clk_stop;
473 pthread_cond_wait( cond, mutex );
474 pthread_mutex_unlock( mutex );
476 if( shared->
mode == 4 )
break;
482 switch( shared->
mode ) {
484 assert( p_input != NULL );
485 assert( p_output != NULL );
486 #ifdef RDScheme_GLOBAL_Y
496 for(
unsigned long int i=0; i<shared->
repeat; ++i )
499 shared->
time = (clk_stop.tv_sec-clk_start.tv_sec)*1000;
500 shared->
time += (clk_stop.tv_nsec-clk_start.tv_nsec)/1000000.0;
502 #ifndef RDScheme_NO_COLLECT
507 assert( p_input != NULL );
508 assert( p_output != NULL );
509 #ifdef RDScheme_GLOBAL_Y
519 for(
unsigned long int i=0; i<shared->
repeat; ++i )
522 shared->
time = (clk_stop.tv_sec-clk_start.tv_sec)*1000;
523 shared->
time += (clk_stop.tv_nsec-clk_start.tv_nsec)/1000000.0;
525 #ifndef RDScheme_NO_COLLECT
530 std::cout <<
"Thread " <<
id <<
": Error, undefined operation (" << shared->
mode <<
")!" << std::endl;
536 pthread_mutex_lock( mutex );
541 #ifdef RDScheme_GLOBAL_Y
556 #ifdef RDScheme_GLOBAL_Y
559 const size_t s = shared->
id;
567 assert( p_output != NULL );
568 assert( shared->
local_y != NULL );
576 virtual T*
mv(
const T* x ) {
577 T* ret = (T*) numa_alloc_interleaved( this->
nor *
sizeof( T ) );
585 virtual void zxa(
const T* x, T* z ) {
590 virtual void zxa(
const T* x, T* z,
const unsigned long int repeat ) {
592 for(
size_t i=0; i<
P; i++ ) {
605 pthread_mutex_lock( &
mutex );
606 pthread_cond_broadcast( &
cond );
607 pthread_mutex_unlock( &
mutex );
618 virtual void zax(
const T* x, T* z ) {
619 zax( x, z, 1, 0, NULL );
623 virtual void zax(
const T* x, T* z,
const unsigned long int repeat,
const clockid_t clock_id,
double *elapsed_time ) {
625 for(
size_t i=0; i<
P; i++ ) {
641 pthread_mutex_lock( &
mutex );
642 pthread_cond_broadcast( &
cond );
643 pthread_mutex_unlock( &
mutex );
649 double maxtime = 0.0;
650 for(
size_t i=0; i<
P; i++ ) {
652 if( curtime > maxtime ) maxtime = curtime;
654 if( elapsed_time != NULL )
655 *elapsed_time += maxtime;
665 for(
size_t s = 0; s <
P; ++s )
675 std::cerr <<
"Warning: RDScheme::getFirstIndexPair has no unique answer since it implements a parallel multiplication!\nIgnoring call..." << std::endl;
ULI nnz
Number of non-zeros.
Definition: SparseMatrix.hpp:58
pthread_cond_t end_cond
Wait for end mechanism: condition.
Definition: RDScheme.hpp:197
virtual void getFirstIndexPair(ULI &i, ULI &j)
Function disabled for parallel schemes!
Definition: RDScheme.hpp:674
static const T * input
Input vector.
Definition: RDScheme.hpp:173
size_t P
Total number of processors.
Definition: RDScheme.hpp:79
virtual ~RDScheme()
Base deconstructor.
Definition: RDScheme.hpp:218
pthread_cond_t * cond
Condition used for synchronisation.
Definition: RDScheme.hpp:103
pthread_mutex_t mutex
Stop/continue mechanism: mutex.
Definition: RDScheme.hpp:188
size_t output_vector_size
Length of the local output vector.
Definition: RDScheme.hpp:118
T * local_y
Pointer to the local output vector.
Definition: RDScheme.hpp:124
pthread_t * threads
p_threads associated to this data strcuture
Definition: RDScheme.hpp:179
static size_t P
Number of threads to fire up.
Definition: RDScheme.hpp:170
virtual void zxa(const T *x, T *z, const unsigned long int repeat)
Definition: RDScheme.hpp:590
RDScheme_shared_data< T > * thread_data
array of initial thread data
Definition: RDScheme.hpp:182
unsigned long int cores() const
The number of available cores.
Definition: MachineInfo.cpp:77
pthread_mutex_t * mutex
Mutex used for synchronisation.
Definition: RDScheme.hpp:100
size_t sync
Used for synchronising threads.
Definition: RDScheme.hpp:200
virtual unsigned long int m()
Queries the number of rows this matrix contains.
Definition: SparseMatrix.hpp:107
Full parallel row-distributed SpMV, based on CSB (Morton curve + Cilk) and PThreads.
Definition: RDScheme.hpp:163
static void end(pthread_mutex_t *mutex, pthread_cond_t *cond, size_t *sync, const size_t P)
End synchronisation code.
Definition: RDScheme.hpp:322
static T * output
Output vector.
Definition: RDScheme.hpp:176
pthread_cond_t * end_cond
Condition used for end sync.
Definition: RDScheme.hpp:109
std::vector< Triplet< T > > * original
Array of vectors of thread-local nonzeroes.
Definition: RDScheme.hpp:88
static const MachineInfo & getInstance()
Gets a singleton instance.
Definition: MachineInfo.cpp:38
virtual T * mv(const T *x)
Overloaded mv call; allocates output vector using numa_interleaved.
Definition: RDScheme.hpp:576
size_t * nzb
Will store rowsums.
Definition: RDScheme.hpp:91
RDScheme_shared_data(size_t _id, size_t _P, std::vector< Triplet< double > > *_original, size_t *_nzb, pthread_mutex_t *_mutex, pthread_cond_t *_cond, pthread_mutex_t *_end_mutex, pthread_cond_t *_end_cond, size_t *_sync, size_t *_end_sync, size_t _ovsize, size_t _ovoffset)
Default constructor.
Definition: RDScheme.hpp:148
RDScheme(const std::string file, T zero)
Base constructor.
Definition: RDScheme.hpp:208
pthread_mutex_t * end_mutex
Mutex used for end sync.
Definition: RDScheme.hpp:106
static void * thread(void *data)
SPMD code for each thread involved with parallel SpMV multiplication.
Definition: RDScheme.hpp:346
static void collectY(RDScheme_shared_data< T > *shared)
Reduces a distributed output vector set into a single contiguous output vector at process 0...
Definition: RDScheme.hpp:554
size_t output_vector_offset
Offset of the local output vector compared to global indices.
Definition: RDScheme.hpp:121
void loadFromFile(const std::string file, const T zero=0)
Function which loads a matrix from a matrix market file.
Definition: SparseMatrix.hpp:89
Interface common to all sparse matrix storage schemes.
Definition: SparseMatrix.hpp:46
double time
Will store local timing.
Definition: RDScheme.hpp:94
ULI noc
Number of columns.
Definition: SparseMatrix.hpp:55
size_t bytes
Will store memory use.
Definition: RDScheme.hpp:97
unsigned long int repeat
how many times to repeat the operation set in `mode' (above, only for 2 and 3)
Definition: RDScheme.hpp:85
size_t * end_sync
Counter used for end sync.
Definition: RDScheme.hpp:115
virtual void load(std::vector< Triplet< T > > &input, const ULI m, const ULI n, const T zero)
Loads a sparse matrix from an input set of triplets.
Definition: RDScheme.hpp:256
size_t id
Thread ID.
Definition: RDScheme.hpp:76
size_t * sync
Counter used for synchronisation.
Definition: RDScheme.hpp:112
static void synchronise(pthread_mutex_t *mutex, pthread_cond_t *cond, size_t *sync, const size_t P)
Synchronises all threads.
Definition: RDScheme.hpp:334
virtual void zax(const T *x, T *z, const unsigned long int repeat, const clockid_t clock_id, double *elapsed_time)
See SparseMatrix::zax.
Definition: RDScheme.hpp:623
pthread_cond_t cond
Stop/continue mechanism: condition.
Definition: RDScheme.hpp:191
virtual size_t bytesUsed()
Definition: RDScheme.hpp:663
ULI nor
Number of rows.
Definition: SparseMatrix.hpp:52
void wait()
Lets the calling thread wait for the end of the SpMV multiply.
Definition: RDScheme.hpp:242
T zero_element
The element considered to be zero.
Definition: SparseMatrix.hpp:63
RDScheme(std::vector< Triplet< T > > &input, ULI m, ULI n, T zero)
Base constructor.
Definition: RDScheme.hpp:213
unsigned char mode
0 undef, 1 init, 2 zax, 3 zxa, 4 exit
Definition: RDScheme.hpp:82
RDScheme_shared_data()
Base constructor.
Definition: RDScheme.hpp:127
size_t end_sync
Used for construction end signal.
Definition: RDScheme.hpp:203
virtual unsigned long int n()
Queries the number of columns this matrix contains.
Definition: SparseMatrix.hpp:115
A single triplet value.
Definition: Triplet.hpp:52
static clockid_t global_clock_id
Clock type used for thread-local timing.
Definition: RDScheme.hpp:185
virtual void zax(const T *x, T *z)
See SparseMatrix::zax.
Definition: RDScheme.hpp:618
Shared data for RDScheme threads.
Definition: RDScheme.hpp:71
pthread_mutex_t end_mutex
Wait for end mechanism: mutex.
Definition: RDScheme.hpp:194
virtual void zxa(const T *x, T *z)
Definition: RDScheme.hpp:585