Using Data Structures Safely with Threads

Complete: 2

Contacts

Introduction

There are many ways one can make an algorithm and its accompanying data structure thread safe. In this twiki we will try to address in preferred order the ways to handle thread safety.

In the C++11 standard, the language now explicitly says that 'data races are illegal'. A data race occurs when multiple non synchronized conncurent accesses happen on the same piece of memory and at least one of those accesses is a write. So thread safety boils down to properly handling synchronization of data access.

const and Thread Safety

The C++11 standard expects to be able to safely interact with an instance of a class from multiple threads as long as those threads are only calling const functions. The C++11 standard does not expect to be able to safely call non const functions simultaneously. Therefore all classes available from the standard, e.g. std::vector<>, can safely be accessed from multiple threads in the same manner.

CMS data structures which are allowed to be accessed from multiple threads, e.g. data products in the Run, LuminosityBlock, Event or EventSetup, are required to also be safe to access via simultaneous calls to const member functions.

Separate Memory Per Thread

The best way to deal with synchronization is to avoid it all together. By making separate memory for each thread there is no possibility of a data race. This also guarantees to scale perfectly as CPU core counts increase. Furthermore, since the data structure will never be accessed by multiple threads it means the const member functions do not have to be thread-safe.

Sometimes you need to create a summary of information over a large collection. Take for example a large numeric integration. In the end you need one number. However to get the answer you can apply what is known a 'map-reduce'. You can break up the integration into different ranges where each range runs on its own thread and has its own memory location to store the result of the 'partial' integration. [This is known as mapping the integration onto a set of ranges.] Then once all the partial integrations are done you collect the list of 'partial' values and sum them together to get the full integration value. [This is known as reducing the partial integration into a final integration.]

Example: Use TBB's parallel_for to break a calculation into kMaxPartition chunks and run the concurrently

//This is used to hold our results
std::vector<double> results{kMaxPartition};

//Use parallel for loop with a C++11 lambda to do mapping
tbb::parallel_for(size_t(0),kMaxPartition,
                  [&results](size_t i){ results[i] = doIntegralFor(i); } );

//Now do reduce step
double result=0.;
for(double v: results) { result+=v;}

Use Atomics

C++11 added a std::atomic<> type to the standard. This type is the only type guaranteed to be thread-safe in the language. That is, it is safe to read and write to an instance of that type simultaneously by different threads. The outcome of the simultaneous read and write (or multiple writes) is not knowable before hand, however the C++ standard guarantees that all threads will agree on the order in which the reads and writes occurred. For example, lets say we have a variable std::atomic<int> ai = 0; and two threads doing the following operations

Thread 1: int b = ai;
Thread 2: int c = ++ai;
Then the C++ standard says that all threads will agree that either b==0, c==1 or b==1, c==1. Similarly in the case

Thread 1: int b = ++ai;
Thread 2: int c = ++ai;

then all threads will agree either b==1, c==2 or b==2, c==1. In contrast, if ai was a simple int then the standard gives no guarantees and the likely values for b and c could be any combination of 0,1,2 or even something else.

Another property of the std::atomic<> is it defines thread safe 'ordering' of operations before and after each operation on an std::atomic<>. Say we have int a=0, b=0; std::atomic isSet=false;. Then because of the ordering guarantee it is safe to do

Thread 1: a=2; b=3; isSet = true;
Thread 2: while(not isSet) {}; std::cout<<a<<' '<<b<<std::endl;

The standard guarantees the output will be 2 3 becuase all threads will agree that a and b are modified before isSet was set. NOTE gcc4.7 does not implement this part of the standard so we must insert __sync__synchonize() to force the proper ordering:

Thread 1: a=2; b=3; __sync_synchronize(); isSet = true;
Thread 2: while(not isSet) {}; __sync_synchronize(); std::cout<<a<<' '<<b<<std::endl;

The use of std::atomic<> does not come for free. In order to have all threads agree on its value it requires all cores in a computer to synchronize their local memory caches. Synchronizing caches is a relatively slow operation. In addition the ordering guarantee forces the compiler and the CPU to avoid certain optimizations (e.g. memory prefetching). However, if the algorithm requires synchronization across threads std::atomic<> is the most performant way to do that.

Counting

If you require a simple counter that is updated by multiple threads then std::atomic<> is the best type for the job. The increment and decrement operators are thread safe.

Lazy caching of a pointer

If you want to delay the calculation of a quantity until it is first requested and that value is going to be stored in a pointer then you can use an std::atomic<T*> and the member function compare_exchange_strong.

In the header file for a class

#include <atomic>
...
class Blah {
   ...
   std::atomic<Foo*> m_foo;
};

In the source file for a class

const Foo& Blah::foo() const {
   if(nullptr==m_foo.load()) {
      //doesn't seem to be set yet
      std::unique_ptr<Foo> f{ new Foo(...) };

      //see if we should keep our instance
      Foo* expected = nullptr; //will be changed to value that is presently in m_foo
      if(m_foo.compare_exchange_strong(expected, f.get()) ) {
         //m_foo was equal to nullptr and now is equal to f.get()
         f.release();

         //if the comparision fails then another thread filled in m_foo
         // so we will just let the unique_ptr delete our calculation of the value
      }
   }
   return *m_foo;
}

The compare_exchange_strong checks to see if the std::atomic<> is equal to the first argument and only if it is does it change its value to be equal to the second argument. If it is not equal to the first argument, then the first argument is changed to be the same as the value presently in the std::atomic<>. This comparison and exchange is guaranteed to be non-interuptable (i.e. atomic).

Lazy caching of a value

It is possible to use an std::atomic<char> to deal with the case where you want to cache a value the first time it is requested.

In the header file for a class

#include <atomic>
...
class Blah {
   ...
   Foo m_foo;
   std::atomic<char> m_fooState;
   enum FooStates {kUnset, kSetting, kSet};
};

In the source file for a class

Foo Blah::foo() const {
   if(kSet==m_fooState.load()) return m_foo;

   //need to make one
   Foo tmp{...}

   //Try to cache
   char expected = kUnset;
   if(m_fooState.compare_exchange_strong(expected, kSetting) ) {
     //it is our job to set the value
     m_foo.swap(tmp);
     
     //this must be after the swap
     m_fooState.store(kSet);
     return m_foo;
   }
   //another thread beat us to trying to set m_foo
   // since we don't know when the other thread will finish
   // we just return tmp
   return tmp;
}

It is important that the function return by 'value' and not by 'const reference' since m_foo may or may not be set by the time Blah::foo is returning in the case where the m_fooState.compare_exchange_strong fails. The other thread which is supposed to set the value for m_foo may have been stalled by the operating system for an indefinite amount of time.

This pattern would cause multiple threads to do the same work (making Foo) in the, hopefully unlikely, case where two threads try to update the value simultaneously.

If a value must be returned by 'const reference' it is possible to have the thread which fails the m_fooState.compare_exchange_strong to do

} else while(kSet != m_foo.load()) {}
return m_foo;
This pattern is not recommended since it causes a core to do no useful work and it slows down the other cores in the system because they have to constantly synchronize their memory caches.

Use a TBB Container

Intel's Threading Building Block (TBB) library provides thread safe containers. See their users guide and reference manual for full details. The containers handle concurrent updates and reads although they do not support thread-safe modifications of the objects they contain. If modification of the contained objects is required then either the object must internally be thread-safe for update or one must use an external synchronization mechanism (see SerialTaskQueue and Locks). Below we describe two of the TBB containers.

tbb::concurrent_queue<T>

The tbb::concurrent_queue<T> is a first-in/first-out which allows multiple threads to safely push and try_pop items simultaneously in the container. The push adds a copy of the object to the container while try_pop removes an item from the container returns a copy of the removed object (if there is an object in the container). A typical usage would be

Shared variables:

tbb::concurrent_queue<double> resultsQueue;

Worker thread:

//does some work
result = long_calculation(...);

//pushes results to queue
resultsQueue.push(result);

//communicate to a waiting thread that new results are in
signal_waiting_thread();

Results thread:

double fullValue =0.;
unsigned in nRunningWorkers = kMaxWorkers;
do {
  //have thread sleep until a worker signals us
  yield_till_signaled();
  double partialResult=0.;

  //keep pulling new values until reach end
  while(resultsQueue.try_pop(partialResult)) {
    fullValue += partialResult;
    --nRunningWorkers;
  }
} while(0 != nRunningWorkers);

tbb::concurrent_vector<T>

tbb::concurrent_vector<T> is a random access container which allows concurrent push_back and access. Unlike a std::vector a tbb::concurrent_vector does not invalidate any iterators when the container grows. Although read/writes to the container are thread safe, there is no guarantee that the object being requested from the container has finished being constructed. It is up to the user to provide some guarantee that the object has finished construction before it is used. The TBB documentation suggests adding an std::atomic<bool> to the class and updating it as the last step of the constructor. Alternatively one could use the type std::atomic<T*> within the container.

Example: Say you have setup a per Run cache object

struct RunCache {
   mutable tbb::concurrent_vector<double> partialResults;
};

And a per thread cache (or in cmsRun a per Stream cache)

struct ThreadCache {
   double* partialResult;
};

One RunCache is created per Run and the first time one of your threads sees that run it creates an entry in the vector for its final results

auto itr = runCache.partialResults.push_back(0);
  //NOTE: do not have to work about object not yet finished construction because we are in same thread as the push_back
  threadCache.partialResult = &(*itr);

Then as the thread processes the data for the Run it just updates its value

*(threadCache.partialResult) += the_calculation(...);

Once all threads are done with the Run, the last thread can pull together all the results

double fullResult = 0.;
  for(double partial: partialResults) {
      fullResult += partial;
  }

Use a SerialTaskQueue

If an algorithm that will only be run within cmsRun needs to serialize (allow only one thread at a time) access to a non-thread safe data structure, e.g. a std::vector<> in which you are accumulating info about many events, then consider the use of a SerialTaskQueue. You create a SerialTaskQueue for each data structure you need to interact with on multiple threads. Whenever you need to perform an operation on the data structure, you push a 'task' that does that operation onto the queue. The queue then makes sure to run one and only one task at a time. This guarantees serial access to the data structure and therefore thread-safety. The 'tasks' managed by the SerialTaskQueue are just functor objects who which take no arguments and return no values. The simplest way to create a task is to use a C++11 lambda.

Example: Imagine we have the following data structures.

#include "FWCore/Concurrency/interface/SerialTaskQueue.h"
...
const unsigned int kMax=1000;
std::vector<int> values;
edm::SerialTaskQueue valuesQueue;

On thread 1 we can fill the vector

for(int i=0; i<kMax;++i) {
  valuesQueue.pushAndWait( [&values,i]{ values.push_back(i);} );
}

While on thread 2 we periodically print and stop when the vector is filled

bool stop = false;
while(not stop) {
  valuesQueue.pushAndWait([&false,&values] {
    if( 0 == (values.size() % 100) ) {
       std::cout <<values.size()<<std::endl;
    }
    if(values.size()==kMax) {
      stop = true;
    }
  });
}

The SerialTaskQueue internally uses Intel's Thread Building Blocks (TBB) task management to handling running the tasks on available threads. This is why we suggest only using a SerialTaskQueue from within cmsRun. One reason for using TBB is in the case where one thread has to wait for another thread to finish its use of the queue TBB can find other waiting tasks to run. This opportunistic ability to run tasks means the thread on which the waiting task is running will not be idle while the queue is occupied with other work.

Because of the need for TBB, a SerialTaskQueue should not be used internally by a Run, LuminosityBlock or Event data product. The best use of a SerialTaskQueue is as a member data to an algorithm, e.g. EDProducer, which provides thread-safe access to another member data.

Use Locks

The last resort to serialize access to shared resources is to acquire a 'lock' before doing an operation on one of its data structures and then releasing the 'lock' after finishing. Such locking can be accomplished by the C++11 class std::mutex with the helper std::lock_guard<std::mutex>. However due to its cost and implications its usage should be limited to cases where none of the above mentioned methods is possible.

There are four downsides to using a mutex. The first is they are slower (sometimes, much slower) than the other synchronization mechanisms. The reason is that the operating system may be called when there is lock contention (i.e. more than one thread tries to get the lock at the same time). The second is they are relatively large. On linux a std::mutex takes 40 bytes. The third reasons is if another thread already has the lock then the thread that has to wait will do nothing until it can acquire the lock. Given that cmsRun usually assigns only one thread per core, waiting for a lock will cause a core to have no work and therefore slow processing of our data. Fourth, threads which hold multiple mutexes are prone to deadlocks, causing the entire process to hang. We believe that there are a very limited number of use cases for mutexes - mostly within the IO subsystem. If you believe you need a mutex, please contact the CMSSW core framework team.

Edit | Attach | Watch | Print version | History: r11 < r10 < r9 < r8 < r7 | Backlinks | Raw View | WYSIWYG | More topic actions
Topic revision: r11 - 2015-07-08 - ChrisDJones
 
    • Cern Search Icon Cern Search
    • TWiki Search Icon TWiki Search
    • Google Search Icon Google Search

    CMSPublic All webs login

This site is powered by the TWiki collaboration platform Powered by PerlCopyright & 2008-2019 by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
Ideas, requests, problems regarding TWiki? Send feedback