• C++ Programming for Financial Engineering
    Highly recommended by thousands of MFE students. Covers essential C++ topics with applications to financial engineering. Learn more Join!
    Python for Finance with Intro to Data Science
    Gain practical understanding of Python to read, understand, and write professional Python code for your first day on the job. Learn more Join!
    An Intuition-Based Options Primer for FE
    Ideal for entry level positions interviews and graduate studies, specializing in options trading arbitrage and options valuation models. Learn more Join!

Unstable run-times when multithreading with Boost

Joined
12/14/10
Messages
131
Points
38
I read the very nice article “C++ Multithreading in Boost” on Quantnet, and tried my own simple example.

The n-word checksum of the consecutive integers 0, 1, … is computed. For example, for n = 3 it looks like

0 1 2
3 4 5
6 7 8
: : :

The strange thing is, that on a quad-core computer, the performance initially decreased with the number of threads and gradually increased to optimal performance around 30 threads. Also, the run time was unstable for small numbers of threads (>=2), for example, occasionally 4 threads runs fast as one would expect on a quad-core (as fast as 17.44s in the example below).

My question: Is there a bug in my code, or is there an explanation for this behaviour?

The output for the case n = 1 follows, and the source code is below.

>checksum.exe 9999999999 1 1

4 cores/processors detected on this system.

----- 1 word (8 byte) checksum of 0 through 9999999999 is: 0x00000002540be3ff-----
1 thread(s) used in parallel
61.24 s

...
2 thread(s) used in parallel
93.63 s

3 thread(s) used in parallel
76.76 s

4 thread(s) used in parallel
49.33 s

5 thread(s) used in parallel
20.48 s

10 thread(s) used in parallel
45.35 s

20 thread(s) used in parallel
25.83 s

30 thread(s) used in parallel
21.70 s

40 thread(s) used in parallel
23.91 s

Code:
#include <iostream>
#include <vector>
 
#include <boost/format.hpp>
#include <boost/progress.hpp>
#include <boost/thread.hpp>
#include <boost/lexical_cast.hpp>
 
using namespace std;
 
using boost::format;
using boost::io::group;
using boost::progress_timer;
using boost::thread;
using boost::thread_group;
using boost::lexical_cast;
 
/** class for computing checksums */
 
template<typename IntType = long>
class checksum
{
public:
    /// n_ is the number of elements of type IntType in each checksum
    checksum(unsigned n_ = 1) : n(n_), value(n) {}
 
    /// Adds a single word to the current checksum
    template<typename RanIter>
    void operator()(RanIter begin);
 
    /// get current checksum
    vector<IntType> getChecksum() const {return value;}
private:
    const unsigned n;
 
    vector<IntType> value;
};
 
template<typename IntType>
template<typename RanIter>
inline void checksum<IntType>::operator()(RanIter begin)
{
    for (unsigned i = 0; i < n; ++i, ++begin)
        value[i] ^= *begin;
}
 
/** class used to create threads */
 
template<typename IntType = long>
class consecutive_checksum
{
public:
    consecutive_checksum(IntType length_ = 1, IntType start = 0, unsigned words = 1) : length(length_), counter(start), n(words), sum(n) {}
 
    void operator()();
 
    vector<IntType> getChecksum() const {return sum.getChecksum();}
private:
    const IntType length;
    IntType counter;
    const unsigned n;
 
    checksum<IntType> sum;
};
 
template<typename IntType>
void consecutive_checksum<IntType>::operator()()
{
    vector<IntType> values(n);
 
    for (IntType j = 0; j < length; ++j)
    {
        for (unsigned k = 0; k < n; ++k)
            values[k] = counter++;
 
        sum(values.begin());
    }
}
 
/** main thread */
 
int main(int argc, char * argv[])
{
    if (argc != 4)
    {
        cerr << "Incorrect number of arguments.\n";
        exit(EXIT_FAILURE);
    }
 
    typedef long long IntType;    /// integer type to use for counters
 
    const IntType i            = lexical_cast<IntType>(argv[1]);
    const unsigned n        = lexical_cast<unsigned>(argv[2]);
    const unsigned threads    = lexical_cast<unsigned>(argv[3]);
 
    cout << endl << format("%1% cores/processors detected on this system.\n\n")
        % thread::hardware_concurrency();
 
    progress_timer timer;
 
    IntType iterationsPerThread = IntType( ceil(double(i) / threads) );    /// round up
 
    thread_group threadSet;
 
    typedef consecutive_checksum<IntType> consCSType;
    vector< shared_ptr<consCSType> > csum;
 
    for (unsigned m = 0; m < threads; ++m)
    {
        IntType iterations( m < threads - 1 ? iterationsPerThread : i - iterationsPerThread * (threads - 1) );
 
        shared_ptr<consCSType> p( new consCSType(iterations, iterationsPerThread * m * n, n) );
        csum.push_back(p);
        /// create a thread using a reference so that the class gets updated
        threadSet.create_thread( ref(*csum[m].get()) );
    }
 
    /// wait for all the threads to complete
    threadSet.join_all();
 
    /// combine the checksums of each thread
    vector<IntType> result(n);
    for (unsigned m = 0; m < threads; ++m)
    {
        vector<IntType> cs( csum[m] -> getChecksum() );
        for (unsigned k = 0; k < n; ++k)
            result[k] ^= cs[k];
    }
 
    cout << format("----- %4% word (%5% byte) checksum of %2% through %3% is: ")
        % 1 % 0 % (i * n) % n % (n * sizeof(IntType));
 
    /// output the checksum in hexadecimal
    cout << "0x";
    for (vector<IntType>::const_iterator iter = result.begin(); iter != result.end(); ++iter)
    {
        cout << format("%1%") % group(hex, setw(sizeof(IntType) * 2), setfill('0'), *iter);
    }
    cout << "-----\n";
 
    cout << format("%1% thread(s) used in parallel\n") % threads;
 
    return EXIT_SUCCESS;
}
 
Hi,
I am one of the authors of the Boost Thread article. I ran your progam with args 10^6, 1, n for n = 1, 2, 30 (on 2 core machine)

n = 1, 14 seconds
n = 2, 20 secs
n = 30, 53 secs

I have not done a detailed code review but some initial ideas are:

1. The problem is too small for MT
2. Load balancing among threads is not uniformly distributed.
2. Cache hit rate and false sharing of data among processors.

What work decomposition are you using? Are there dependencies between threads?
 
Hi,
I am one of the authors of the Boost Thread article. I ran your progam with args 10^6, 1, n for n = 1, 2, 30 (on 2 core machine)

n = 1, 14 seconds
n = 2, 20 secs
n = 30, 53 secs

I have not done a detailed code review but some initial ideas are:

1. The problem is too small for MT
2. Load balancing among threads is not uniformly distributed.
2. Cache hit rate and false sharing of data among processors.

What work decomposition are you using? Are there dependencies between threads?

Hi,

Thanks for running it. Maybe you meant 10^9? I would be very surprised if a dual core could be more than 1000x slower than my computer (Q9400 @ 2.66GHz).

My understanding of MT does not include much beyond your article.

For (1), the problem is definitely small in the sense that all of the work is done in
Code:
void consecutive_checksum<IntType>::operator()()
, and the main loop does not do much in each iteration. I'm not sure how a program can be "too small" for MT.

For (2), the load balancing should be nearly perfect in this program.

For (3), I suspected it might have something to do with the cache. There should be no shared data between the threads.

I'm not familiar with the term "work decomposition". There are no dependencies between threads, at least that I am aware of.

I tried putting a yield() call in the main loop, as in your PowerClass. The run time increased by a factor of >40, but it showed more "normal" behaviour: The run times were more stable, and decreased with the number of threads until the optimal 3 threads.
 
I took 10^8 on a Vostro 200 VS2010

!!!!! DEBUG MODE !!!!!!!
for n =1, 143 s
for n = 2, 517 s

Now, I modified a few things but this one

/// get current checksum
const vector<IntType>& getChecksum() const {return
value;}

// vector<IntType> getChecksum() const {return value;}

resulted in

n = 2, time = 245 s (twice as fast!!)

It seems some cache problem at the moment, but pinpointing is not obvious yet.

Some suggestions:

1. Can we remove consecutive_checksum.
2. Maybe use STL transform() with function objects to merge the vectors?
3. Use boost: Pool to manage memory (it's much faster than new).
 
QUALIFY: PREVIOUS WERE <<DEBUG>> MODE

Now, in Release mode with your data 10^10

I get

n = 1, 97 s
n = 2, 237 s

which is in line with your results.
 
Now, if I use raw pointers instead of smart(?) pointers I get for 10^10

n = 2, 47 s on 2 core (thus, linear speedup).

So, smart ptrs ;) ;)
typedef
consecutive_checksum<IntType> consCSType;

// vector< shared_ptr<consCSType> > csum;

vector< consCSType* > csum;
for (unsigned
m = 0; m < threads; ++m)
{

IntType iterations( m < threads - 1 ? iterationsPerThread : i - iterationsPerThread * (threads - 1) );

// shared_ptr<consCSType> p( new consCSType(iterations, iterationsPerThread * m * n, n) );

consCSType* p =
new
consCSType(iterations, iterationsPerThread * m * n, n);
csum.push_back(p);

/// create a thread using a reference so that the class gets updated

// threadSet.create_thread( ref(*csum[m].get()) );

threadSet.create_thread( ref(*csum[m]));

}
 
Now, if I use raw pointers instead of smart(?) pointers I get for 10^10

n = 2, 47 s on 2 core (thus, linear speedup).

So, smart ptrs ;) ;)

...

I tried using raw pointers instead, but got no improvement (no apparent difference) :( Note that I sometimes (~50%) get linear speedup with 2 threads, but other times it's slower than 1 as above. I'm not sure if you were getting linear speedup consistently?

I'm trying a slightly less trivial extension, and will see how it goes ...
 
I tried it again (Boost 1.47) a few times for each scenario with n = 2. Range is

Smart [270, 310] s
Raw [46, 57] s

Just make you are not running other processes ...
 
I tried it again (Boost 1.47) a few times for each scenario with n = 2. Range is

Smart [270, 310] s
Raw [46, 57]

Just make you are not running other processes ...

I tried it again on my dual core laptop at home. The results are now stable but 2 threads are about 50% slower than 1. I tried to close as many other processes as possible in Task Manager, but there are still over 50 processes running (and I can't even do that on my work computer).

I am seeing no difference between raw pointers and smart pointers. In my original code, there was some mixing of boost libraries with the MS implementation of tr1 because of the using namespace std;

I don't know if that could be making your smart pointers slower? Here is my new corrected code which uses conditional compilation to choose between raw and smart pointers.

Code:
/// Uncomment the follwoing line to use raw pointers instead of smart pointers
//#define    USE_RAW_POINTERS

#include <iostream>
#include <vector>

#include <boost/format.hpp>
#include <boost/progress.hpp>
#include <boost/thread.hpp>
#include <boost/lexical_cast.hpp>

using std::cout;
using std::cerr;
using std::endl;
using std::hex;
using std::setw;
using std::setfill;
using std::vector;

using boost::format;
using boost::io::group;
using boost::progress_timer;
using boost::thread;
using boost::thread_group;
using boost::lexical_cast;
using boost::shared_ptr;
using boost::ref;

/** class for computing checksums */

template<typename IntType = long>
class checksum
{
public:
    /// n_ is the number of elements of type IntType in each checksum
    checksum(unsigned n_ = 1) : n(n_), value(n) {}

    /// Adds a single word to the current checksum
    template<typename RanIter>
    void operator()(RanIter begin);

    /// get current checksum
    vector<IntType> getChecksum() const {return value;}
private:
    const unsigned n;

    vector<IntType> value;
};

template<typename IntType>
template<typename RanIter>
inline void checksum<IntType>::operator()(RanIter begin)
{
    for (unsigned i = 0; i < n; ++i, ++begin)
        value[i] ^= *begin;
}

/** class used to create threads */

template<typename IntType = long>
class consecutive_checksum
{
public:
    consecutive_checksum(IntType length_ = 1, IntType start = 0, unsigned words = 1) : length(length_), counter(start), n(words), sum(n) {}

    void operator()();

    vector<IntType> getChecksum() const {return sum.getChecksum();}
private:
    const IntType length;
    IntType counter;
    const unsigned n;

    checksum<IntType> sum;
};

template<typename IntType>
void consecutive_checksum<IntType>::operator()()
{
    vector<IntType> values(n);

    for (IntType j = 0; j < length; ++j)
    {
        for (unsigned k = 0; k < n; ++k)
            values[k] = counter++;

        sum(values.begin());
    }
}

/** main thread */

int main(int argc, char * argv[])
{
    if (argc != 4)
    {
        cerr << "Incorrect number of arguments.\n";
        exit(EXIT_FAILURE);
    }

    typedef long long IntType;    /// integer type to use for counters

    const IntType i            = lexical_cast<IntType>(argv[1]);
    const unsigned n        = lexical_cast<unsigned>(argv[2]);
    const unsigned threads    = lexical_cast<unsigned>(argv[3]);

    cout << endl << format("%1% cores/processors detected on this system.\n\n")
        % thread::hardware_concurrency();

    progress_timer timer;

    IntType iterationsPerThread = IntType( ceil(double(i) / threads) );    /// round up

    thread_group threadSet;

    typedef consecutive_checksum<IntType> consCSType;
#ifdef    USE_RAW_POINTERS
    typedef consCSType * consCSPointer;
#else
    typedef shared_ptr<consCSType> consCSPointer;
#endif
    vector< consCSPointer > csum;

    for (unsigned m = 0; m < threads; ++m)
    {
        IntType iterations( m < threads - 1 ? iterationsPerThread : i - iterationsPerThread * (threads - 1) );

        consCSPointer p( new consCSType(iterations, iterationsPerThread * m * n, n) );

        csum.push_back(p);

        /// create a thread using a reference so that the class gets updated
#ifdef    USE_RAW_POINTERS
        threadSet.create_thread( ref(*csum[m]) );
#else
        threadSet.create_thread( ref(*csum[m].get()) );
#endif
    }

    /// wait for all the threads to complete
    threadSet.join_all();

    /// combine the checksums of each thread
    vector<IntType> result(n);
    for (unsigned m = 0; m < threads; ++m)
    {
        vector<IntType> cs( csum[m] -> getChecksum() );
        for (unsigned k = 0; k < n; ++k)
            result[k] ^= cs[k];
    }

    cout << format("----- %4% word (%5% byte) checksum of %2% through %3% is: ")
        % 1 % 0 % (i * n) % n % (n * sizeof(IntType));

    /// output the checksum in hexadecimal
    cout << "0x";
    for (vector<IntType>::const_iterator iter = result.begin(); iter != result.end(); ++iter)
    {
        cout << format("%1%") % group(hex, setw(sizeof(IntType) * 2), setfill('0'), *iter);
    }
    cout << "-----\n";

    cout << format("%1% thread(s) used in parallel\n") % threads;

    return EXIT_SUCCESS;
}
 
Back
Top