50 #include "SparseMatrix.hpp"
51 #include "Matrix2HilbertCoordinates.hpp"
53 #include "MachineInfo.hpp"
76 #define RDBH_NO_COLLECT
80 template<
typename T >
175 pthread_mutex_t *_mutex, pthread_cond_t *_cond, pthread_mutex_t *_end_mutex, pthread_cond_t *_end_cond,
176 size_t *_sync,
size_t *_end_sync,
177 size_t _ovsize,
size_t _ovoffset,
178 const T **
const _in, T **
const _out
191 int rdbh_uli_compare(
const void *a,
const void *b ) {
192 return ( *(
unsigned long int*)a - *(
unsigned long int*)b );
200 template<
typename T,
class MatrixType = FBICRS< T > >
260 if( _p_translate == NULL ) {
263 for(
size_t i = 0; i <
P; ++i ) {
265 const size_t num_threads_times_two = 120;
272 if( i >= num_threads_times_two ) {
274 core = (i-num_threads_times_two) / 2;
279 const size_t hwthread = i % 2 + offset;
296 RDBHilbert(
const std::string file, T zero = 0, std::vector< size_t > *_p_translate = NULL ) {
310 for(
size_t i = 0; i <
P; i++ ) {
315 pthread_mutex_lock( &
mutex );
316 pthread_cond_broadcast( &
cond );
317 pthread_mutex_unlock( &
mutex );
320 for(
size_t i = 0; i <
P; i++ ) {
321 pthread_join(
threads[ i ], NULL );
327 pthread_mutex_destroy( &
mutex );
328 pthread_cond_destroy( &
cond );
342 numa_set_localalloc();
364 size_t *nzb =
new size_t [ this->
m() ];
365 for( ULI i = 0; i <
m; i++ ) nzb[ i ] = 0;
372 pthread_mutex_init( &
mutex, NULL );
373 pthread_cond_init ( &
cond, NULL );
375 pthread_cond_init ( &
end_cond, NULL );
382 for(
size_t i = 0; i <
P; i++ ) {
402 pthread_attr_init( &attr );
404 pthread_attr_setaffinity_np( &attr,
sizeof( cpu_set_t ), &mask );
408 pthread_attr_destroy( &attr );
416 for(
size_t i = 0; i <
P; i++ ) {
419 std::cout <<
"RDBHilbert: fill-in is " << fillIn <<
", which is +" << 100.0*((double)fillIn)/((double)
input.size()) <<
"%" << std::endl;
427 pthread_mutex_lock( mutex );
430 pthread_cond_signal( cond );
433 pthread_mutex_unlock( mutex );
438 pthread_mutex_lock( mutex );
442 pthread_cond_broadcast( cond );
444 pthread_cond_wait( cond, mutex );
446 pthread_mutex_unlock( mutex );
453 const size_t id = shared->
id;
454 const size_t P = shared->
P;
457 pthread_cond_t *
cond = shared->
cond;
462 pthread_getaffinity_np( pthread_self(),
sizeof( cpu_set_t ), &mask );
463 unsigned long int MyMask = ULONG_MAX;
464 for(
size_t s = 0; s <
P; s++ ) {
465 if( CPU_ISSET( s, &mask ) ) {
466 if( MyMask == ULONG_MAX ) {
469 std::cerr <<
"Thread " <<
id <<
" mask is larger than one core" <<
" (" << MyMask <<
" and " << s <<
" are both set)!" << std::endl;
479 const size_t blocksize = (nnz %
P) > 0 ? nnz / P + 1 : nnz / P;
480 for(
size_t i = 0; i <
nnz; i++ ) {
481 const ULI currow = (*(shared->
original))[ i ].i();
482 const ULI curcol = (*(shared->
original))[ i ].j();
483 if( currow >=
id * blocksize && currow < (
id + 1) * blocksize ) {
484 shared->
nzb[ currow ]++;
486 if( currow > m ) m = currow;
487 if( curcol > n ) n = curcol;
498 const size_t nnz_target = nnz /
P;
502 for( ULI i=0; i<
m; i++ ) cursum += shared->
nzb[ i ];
503 assert( cursum == nnz );
507 unsigned long int start,
end, k = 0;
508 start = end = ULONG_MAX;
509 if(
id == 0 ) start = 0;
511 if(
id == 0 ) std::cout <<
"Breakpoints: ";
513 for( ULI i = 0; i <
m; i++ ) {
514 cursum += shared->
nzb[ i ];
515 if( cursum >= nnz_target ) {
517 if(
id == 0 ) std::cout << (i+1) <<
" ";
519 if( k ==
id ) end = i + 1;
520 if(k+1==
id ) start = i + 1;
525 if( start == ULONG_MAX ) start =
m;
526 if(
id == P-1 || end == ULONG_MAX ) end =
m;
528 if(
id == 0 ) std::cout << std::endl;
530 std::cout <<
"Thread " <<
id <<
" has rows " << start <<
" to " << end << std::endl;
538 std::vector< Triplet< T > > local;
539 for(
size_t i=0; i<
nnz; i++ ) {
540 const ULI currow = (*(shared->
original))[ i ].i();
541 if( currow >= start && currow < end )
548 const size_t local_nnz = local.size();
553 BigInt maxh( 0, 0 ), minh( ULONG_MAX, ULONG_MAX );
555 for(
size_t i=0; i<local_nnz; i++ ) {
557 const BigInt cur( h1, h2 );
559 if( maxh < cur ) maxh = cur;
560 if( minh > cur ) minh = cur;
561 assert( t2h[ i ].high == h1 );
562 assert( t2h[ i ].low == h2 );
566 qsort( t2h, local_nnz,
sizeof(
BigInt ), bigint_compare );
567 assert( local_nnz == 0 || t2h[ local_nnz - 1 ] == maxh );
568 assert( local_nnz == 0 || t2h[ 0 ] == minh );
572 std::map< BigInt, unsigned long int > h2b;
573 std::vector< std::vector< Triplet< T > > > beta;
576 if( local_nnz > 0 ) {
583 unsigned long int c = 0, size = 1;
587 for(
unsigned long int i = 1; i < local_nnz; ++i, ++size ) {
589 if( cur != prev_h ) {
592 beta.back().reserve( size );
602 beta.back().reserve( size );
607 unsigned long int smin_m = ULONG_MAX;
608 unsigned long int smin_n = ULONG_MAX;
609 unsigned long int smax_m, smax_n;
611 unsigned long int *ms =
new unsigned long int[ h2b.size() ];
612 unsigned long int *ns =
new unsigned long int[ h2b.size() ];
613 unsigned long int *minms =
new unsigned long int[ h2b.size() ];
614 unsigned long int *minns =
new unsigned long int[ h2b.size() ];
615 for(
size_t i = 0; i < h2b.size(); i++ ) {
618 minms[ i ] = ULONG_MAX;
619 minns[ i ] = ULONG_MAX;
623 for(
size_t i = 0; i < local_nnz; i++ ) {
624 const ULI row = local[ i ].i();
625 const ULI col = local[ i ].j();
627 const BigInt cur( h1, h2 );
628 assert( cur >= minh );
629 assert( cur <= maxh );
633 if( beta[ h2b[ cur ] ].size() > 1 ) {
635 beta[ h2b[ cur ] ].back().i() /
max_m,
636 beta[ h2b[ cur ] ].back().j() /
max_n, h1, h2 );
637 assert( cur ==
BigInt( h1, h2 ) );
638 assert( beta[ h2b[ cur ] ].back().i() /
max_m == row /
max_m );
639 assert( beta[ h2b[ cur ] ].back().j() /
max_n == col /
max_n );
642 beta[ h2b[ cur ] ].push_back( toAdd );
643 if( row > ms[ h2b[ cur ] ] ) ms[ h2b[ cur ] ] = row;
644 if( col > ns[ h2b[ cur ] ] ) ns[ h2b[ cur ] ] = col;
645 if( row < minms[ h2b[ cur ] ] ) minms[ h2b[ cur ] ] = row;
646 if( col < minns[ h2b[ cur ] ] ) minns[ h2b[ cur ] ] = col;
647 if( row < smin_m ) smin_m = row;
648 if( col < smin_n ) smin_n = col;
649 if( row > smax_m ) smax_m = row;
650 if( col > smax_n ) smax_n = col;
657 for(
size_t i = 0; i < beta.size(); i++ ) {
658 cursum += beta[i].size();
660 assert( cursum == local_nnz );
661 std::cout <<
"Thread " << shared->
id <<
": " << smin_m <<
"," << smax_m <<
" times " << smin_n <<
"," << smax_n <<
" holding " << cursum <<
" nonzeroes." << std::endl;
665 for(
size_t i = 0; i < beta.size(); ++i ) {
666 const ULI row_br = beta[i][0].i() /
max_m;
667 const ULI col_br = beta[i][0].j() /
max_n;
668 for(
size_t k = 1; k < beta[i].size(); ++k ) {
669 assert( beta[i][k].i() /
max_m == row_br );
670 assert( beta[i][k].j() /
max_n == col_br );
683 MatrixType dss( beta, smax_m - smin_m, n );
687 shared->
bytes = dss.bytesUsed();
688 shared->
fillIn = dss.fillIn;
710 pthread_mutex_lock( mutex );
715 struct timespec clk_start, clk_stop;
716 pthread_cond_wait( cond, mutex );
717 pthread_mutex_unlock( mutex );
719 if( shared->
mode == 4 )
break;
721 switch( shared->
mode ) {
770 if( shared->
id != 0) {
780 assert( *(shared->
input) != NULL );
781 assert( *(shared->
output) != NULL );
792 for(
size_t i = 0; i < shared->
repeat; i++ ) {
793 dss.zxa( *(shared->
input), y );
796 shared->
time = (clk_stop.tv_sec-clk_start.tv_sec)*1000;
797 shared->
time += (clk_stop.tv_nsec-clk_start.tv_nsec)/1000000.0;
799 #ifndef RDBH_NO_COLLECT
804 assert( *(shared->
input) != NULL );
805 assert( *(shared->
output) != NULL );
819 for(
size_t i = 0; i < shared->
repeat; i++ ) {
820 dss.zax( *(shared->
input), y );
826 shared->
time = (clk_stop.tv_sec-clk_start.tv_sec)*1000;
827 shared->
time += (clk_stop.tv_nsec-clk_start.tv_nsec)/1000000.0;
829 #ifndef RDBH_NO_COLLECT
834 std::cout <<
"Thread " <<
id <<
": Error, undefined operation (" << shared->
mode <<
")!" << std::endl;
840 pthread_mutex_lock( mutex );
858 const size_t s = shared->
id;
864 assert( *(shared->
output) != NULL );
865 assert( shared->
local_y != NULL );
887 virtual T*
mv(
const T* x ) {
890 size_t allocsize = (this->
nor + 1) *
sizeof( T );
894 T* ret = (T*) numa_alloc_interleaved( allocsize );
896 T* ret = (T*) _mm_malloc( allocsize, 64 );
899 for( ULI i = 0; i < this->
nor; i++ ) {
914 virtual void zxa(
const T* x, T* z ) {
919 virtual void zxa(
const T* x, T* z,
const size_t repeat ) {
921 for(
size_t i=0; i<
P; i++ ) {
934 pthread_mutex_lock( &
mutex );
935 pthread_cond_broadcast( &
cond );
936 pthread_mutex_unlock( &
mutex );
978 for(
size_t i=0; i<
P; i++ ) {
984 pthread_mutex_lock( &
mutex );
985 pthread_cond_broadcast( &
cond );
986 pthread_mutex_unlock( &
mutex );
993 virtual void zax(
const T* x, T* z ) {
994 zax( x, z, 1, 0, NULL );
998 virtual void zax(
const T* x, T* z,
const size_t repeat,
const clockid_t clock_id,
double *elapsed_time ) {
1001 for(
size_t i = 0; i <
P; i++ ) {
1017 pthread_mutex_lock( &
mutex );
1018 pthread_cond_broadcast( &
cond );
1019 pthread_mutex_unlock( &
mutex );
1025 double maxtime = 0.0;
1026 for(
size_t i = 0; i <
P; i++ ) {
1028 if( curtime > maxtime ) maxtime = curtime;
1030 if( elapsed_time != NULL )
1031 *elapsed_time += maxtime;
1083 std::cerr <<
"Warning: RDBHilbert::getFirstIndexPair has no unique answer since it implements a parallel multiplication!\nIgnoring call..." << std::endl;
1089 for(
size_t s = 0; s <
P; ++s )
Hierarchical BICRS with fixed subblock size and distribution.
Definition: FBICRS.hpp:75
ULI nnz
Number of non-zeros.
Definition: SparseMatrix.hpp:58
virtual void zxa(const T *x, T *z)
Definition: RDBHilbert.hpp:914
virtual T * mv(const T *x)
Overloaded mv call; allocates output vector using numa_interleaved.
Definition: RDBHilbert.hpp:887
pthread_mutex_t end_mutex
Wait for end mechanism: mutex.
Definition: RDBHilbert.hpp:241
pthread_cond_t * cond
Pointer to the sync condition.
Definition: RDBHilbert.hpp:122
size_t output_vector_offset
Output vector offset (w.r.t.
Definition: RDBHilbert.hpp:140
double time
Will store local timing.
Definition: RDBHilbert.hpp:110
static const ULI max_m
Given FBICRS, the maximum value for the rowwise matrix size, assuming short ints on ICRS at the lower...
Definition: RDBHilbert.hpp:256
virtual void zxa(const T *x, T *z, const size_t repeat)
Definition: RDBHilbert.hpp:919
size_t * end_sync
End counter.
Definition: RDBHilbert.hpp:134
size_t * nzb
Will store rowsums.
Definition: RDBHilbert.hpp:107
T * local_y
Pointer to the local output vector.
Definition: RDBHilbert.hpp:143
pthread_cond_t cond
Stop/continue mechanism: condition.
Definition: RDBHilbert.hpp:238
const T ** input
Pointer to a global input vector.
Definition: RDBHilbert.hpp:146
size_t bytes
Will store the local amount of bytes used.
Definition: RDBHilbert.hpp:113
unsigned long int cores() const
The number of available cores.
Definition: MachineInfo.cpp:77
static void synchronise(pthread_mutex_t *mutex, pthread_cond_t *cond, size_t *sync, const size_t P)
Definition: RDBHilbert.hpp:437
A 128-bit integer, with overloaded comparison operators.
Definition: BigInt.hpp:38
virtual unsigned long int m()
Queries the number of rows this matrix contains.
Definition: SparseMatrix.hpp:107
size_t P
Number of SPMD processes.
Definition: RDBHilbert.hpp:89
T * output
Output vector.
Definition: RDBHilbert.hpp:217
virtual void load(std::vector< Triplet< T > > &input, const ULI m, const ULI n, const T zero)
Definition: RDBHilbert.hpp:339
static const MachineInfo & getInstance()
Gets a singleton instance.
Definition: MachineInfo.cpp:38
virtual void getFirstIndexPair(ULI &i, ULI &j)
Definition: RDBHilbert.hpp:1082
unsigned char mode
0 undef, 1 init, 2 zax, 3 zxa, 4 exit, 5 reset, 6 ZaX, 7 ZXa.
Definition: RDBHilbert.hpp:95
static void IntegerToHilbert(const size_t i, const size_t j, size_t &h1, size_t &h2)
New method, October 2010.
Definition: Matrix2HilbertCoordinates.cpp:48
RDBHilbert(const std::string file, T zero=0, std::vector< size_t > *_p_translate=NULL)
Default file-based constructor.
Definition: RDBHilbert.hpp:296
The Beta Hilbert triplet scheme.
Definition: RDBHilbert.hpp:201
size_t output_vector_size
Output vector length.
Definition: RDBHilbert.hpp:137
pthread_cond_t * end_cond
Pointer to the end condition.
Definition: RDBHilbert.hpp:128
void loadFromFile(const std::string file, const T zero=0)
Function which loads a matrix from a matrix market file.
Definition: SparseMatrix.hpp:89
Interface common to all sparse matrix storage schemes.
Definition: SparseMatrix.hpp:46
size_t end_sync
Used for construction end signal.
Definition: RDBHilbert.hpp:250
size_t repeat
How many times to repeat the operation set in `mode' (above, only for 2, 3, 6 and 7) ...
Definition: RDBHilbert.hpp:98
T ** output
Pointer to a global output vector.
Definition: RDBHilbert.hpp:149
pthread_mutex_t * end_mutex
Pointer to the end mutex.
Definition: RDBHilbert.hpp:125
ULI noc
Number of columns.
Definition: SparseMatrix.hpp:55
virtual ~RDBHilbert()
Base deconstructor.
Definition: RDBHilbert.hpp:308
std::vector< size_t > p_translate
Which processors to pin threads to.
Definition: RDBHilbert.hpp:211
RDB_shared_data()
In case of ZXa or ZaX, a pointer to a collection of global input vectors.
Definition: RDBHilbert.hpp:161
static size_t P
Number of threads to fire up.
Definition: RDBHilbert.hpp:208
static clockid_t global_clock_id
Clock type used for thread-local timing.
Definition: RDBHilbert.hpp:232
virtual size_t bytesUsed()
Definition: RDBHilbert.hpp:1087
static const ULI max_n
Given FBICRS, the maximum value for columnwise matrix size.
Definition: RDBHilbert.hpp:253
void reset()
Definition: RDBHilbert.hpp:976
RDBHilbert(std::vector< Triplet< T > > &input, ULI m, ULI n, T zero=0, std::vector< size_t > *_p_translate=NULL)
Default triplet-based constructor.
Definition: RDBHilbert.hpp:302
void wait()
Definition: RDBHilbert.hpp:332
size_t sync
Used for synchronising threads.
Definition: RDBHilbert.hpp:247
pthread_mutex_t * mutex
Pointer to the sync mutex.
Definition: RDBHilbert.hpp:119
static void end(pthread_mutex_t *mutex, pthread_cond_t *cond, size_t *sync, const size_t P)
Definition: RDBHilbert.hpp:426
static void collectY(RDB_shared_data< T > *shared)
Definition: RDBHilbert.hpp:853
ULI nor
Number of rows.
Definition: SparseMatrix.hpp:52
size_t * sync
Sync counter.
Definition: RDBHilbert.hpp:131
pthread_t * threads
Input vectors for multiple-RHS operations.
Definition: RDBHilbert.hpp:226
RDB_shared_data(size_t _id, size_t _P, std::vector< Triplet< double > > *_original, size_t *_nzb, pthread_mutex_t *_mutex, pthread_cond_t *_cond, pthread_mutex_t *_end_mutex, pthread_cond_t *_end_cond, size_t *_sync, size_t *_end_sync, size_t _ovsize, size_t _ovoffset, const T **const _in, T **const _out)
Recommended constructor.
Definition: RDBHilbert.hpp:172
std::vector< Triplet< T > > * original
How many SpMVs in a multiple-RHS computation (ZXa or ZaX).
Definition: RDBHilbert.hpp:104
T zero_element
The element considered to be zero.
Definition: SparseMatrix.hpp:63
pthread_mutex_t mutex
Stop/continue mechanism: mutex.
Definition: RDBHilbert.hpp:235
pthread_cond_t end_cond
Wait for end mechanism: condition.
Definition: RDBHilbert.hpp:244
const T * input
Input vector.
Definition: RDBHilbert.hpp:214
RDB_shared_data< T > * thread_data
array of initial thread data
Definition: RDBHilbert.hpp:229
size_t fillIn
Will store the total fillIn at this thread.
Definition: RDBHilbert.hpp:116
static void * thread(void *data)
Definition: RDBHilbert.hpp:450
virtual void zax(const T *x, T *z)
Definition: RDBHilbert.hpp:993
virtual unsigned long int n()
Queries the number of columns this matrix contains.
Definition: SparseMatrix.hpp:115
A single triplet value.
Definition: Triplet.hpp:52
size_t id
Process ID.
Definition: RDBHilbert.hpp:86
virtual void zax(const T *x, T *z, const size_t repeat, const clockid_t clock_id, double *elapsed_time)
Definition: RDBHilbert.hpp:998
Shared data for RDBHilbert threads.
Definition: RDBHilbert.hpp:81
void set_p_translate(std::vector< size_t > *_p_translate)
Sets p_translate to 0..P-1 by default, or equal to the optionally supplied vector.
Definition: RDBHilbert.hpp:259