Multithread Wrap
One of the challenges of writing multithreaded code is that it is often necessary to share data that was not designed to be used by multiple threads at the same time. In this blog post, I want to show two approaches to protect such data: duplication and wrap.
Duplication
The first approach is the simplest. Just duplicate the data for each thread. For this to work, the data has to meet a few criteria:
- be easy to identify,
- have no hidden parts,
- be easy to duplicate,
- have no essential requirements to be shared at all times.
If the data meets all these criteria, then duplication is the fastest and safest option. Usually, data that can be used this way is essentially a group of values, like a pure structure in C++, containing unchanging simple values.
Wrapping
If your data doesn’t meet the duplication criteria, the second approach of wrapping the data can be used. A common case is when you are given an interface that would need to be shared among multiple threads. Here are the steps to create multithread wrapping:
- Identify the interface that needs to be isolated.
- Write a thin multi-thread protector to the interface.
- Write a thin per-thread implementation of the interface.
To illustrate the technique, I will show you an example of a multi-thread wrapping I have recently done in C++. The code was part of the Tantrix Solver application I wrote. The particular item I needed to convert to multithreaded use was the progress report interface.
The code for that application is available on GitHub.
Identify the Interface
The first step is to fully identify what will be used by the threads. This may require some refactoring if it is a disparate group of items. In the progress example, it was an interface called progress_t
. Note that it only has one virtual function that really needs to be made thread-safe: update_progress()
.
// Report progress of work.
//
// Not thread safe. Wrap in a multi_thread_progress_t if needed.
struct progress_t
{
// Create a progress reporter.
progress_t() = default;
// Force to report the progress tally.
void flush_progress();
// Clear the progress.
void clear_progress();
// Update the progress with an additional count.
void progress(size_t a_done_count);
size_t total_count_so_far() const;
protected:
// Update the total progress so far to the actual implementation.
virtual void update_progress(size_t a_total_count_so_far) = 0;
};
Multithread Protector
The second step is to create a multi-thread protector. The design of all protector is always the same:
- Do not implement the interface to be protected.
- Keep custody of the original non-thread-safe interface implementation.
- Provide multi-thread protection, usually with a mutex.
- Provide protected access to the per-thread implementation.
The reason not to implement the desired interface is that the multi-thread protector is not meant to be used directly. If it doesn’t have the interface, it can’t be used accidentally as the interface.
Its implementation will still mimic the interface very closely. The difference is that each corresponding function will take a lock on the mutex and call the original, non-thread-safe interface. This is how it is protected against the multiple threads.
Here is the example for the progress_t
interface:
// Wrap a non-thread-safe progress in a multi-thread-safe progress.
//
// The progress can only be reported by a per-thread-progress referencing
// this multi-thread progress.
struct multi_thread_progress_t
{
// Wrap a non-threas safe progress.
multi_thread_progress_t() = default;
multi_thread_progress_t(progress_t& a_non_thread_safe_progress)
: my_non_thread_safe_progress(&a_non_thread_safe_progress), my_report_every(a_non_thread_safe_progress.my_report_every) {}
// Report the final progress tally when destroyed.
~multi_thread_progress_t();
// Force to report the progress tally.
void flush_progress() { report_to_non_thread_safe_progress(my_total_count_so_far); }
// Clear the progress.
void clear_progress() { my_total_count_so_far = 0; }
protected:
// Receive progress from a per-thread progress. (see below)
void update_progress_from_thread(size_t a_count_from_thread);
// Propagate the progress to the non-thread-safe progress.
void report_to_non_thread_safe_progress(size_t a_count);
private:
progress_t* my_non_thread_safe_progress = nullptr;
size_t my_report_every = 100 * 1000;
std::atomic my_total_count_so_far = 0;
std::mutex my_mutex;
friend struct per_thread_progress_t;
};
The important functions are: update_progress_from_thread()
and report_to_non_thread_safe_progress()
. The first one receives the progress from each per-thread progress implementation that will be shown later. It accumulates the total in a multi-thread-safe variable and only reports on it when it crosses a given threshold. The second function forwards the progress to the non-thread-safe implementation under the protection of a mutex. Here's the implementation for both:
void multi_thread_progress_t::update_progress_from_thread(size_t a_count_from_thread)
{
if (!my_non_thread_safe_progress)
return;
const size_t pre_count = my_total_count_so_far.fetch_add(a_count_from_thread);
const size_t post_count = pre_count + a_count_from_thread;
if ((pre_count / my_report_every) != (post_count / my_report_every))
{
report_to_non_thread_safe_progress(post_count);
}
}
void multi_thread_progress_t::report_to_non_thread_safe_progress(size_t a_count)
{
std::lock_guard lock(my_mutex);
my_non_thread_safe_progress->update_progress(a_count);
}
Per-Thread Implementation
The final part of the pattern is the thin per-thread implementation of the interface. In this case we do want to implement the interface. This will be what replaces the original, non-thread-safe implementation. Note that it doesn’t need to be thread-safe! It is meant to be used by a single thread and the multi-thread protection is done in the multi-thread protector that we have shown before.
This division of labor between the protector and the per-thread part greatly simplifies reasoning around the code and simplifies the code itself.
Here is the declaration of the per-thread progress in the example:
// Report the progress of work from one thread to a multi-thread progress.
//
// Create one instance in each thread. It caches the thread progress and
// only report from time to time to the multi-thread progress to avoid
// accessing the shared atomic variable too often.
struct per_thread_progress_t : progress_t
{
// Create a per-thread progress that report to the given multi-thread progress.
per_thread_progress_t() = default;
per_thread_progress_t(multi_thread_progress_t& a_mt_progress)
: progress_t(a_mt_progress.my_report_every / 10), my_mt_progress(&a_mt_progress) {}
per_thread_progress_t(const per_thread_progress_t& an_other)
: progress_t(an_other), my_mt_progress(an_other.my_mt_progress) { clear_progress(); }
per_thread_progress_t& operator=(const per_thread_progress_t& an_other)
{
progress_t::operator=(an_other);
// Avoid copying the per-thread progress accumulated.
clear_progress();
return *this;
}
// Report the final progress tally when destroyed.
~per_thread_progress_t();
protected:
// Propagate the progress to the multi-thread progress.
void update_progress(size_t a_total_count_so_far) override
{
if (!my_mt_progress)
return;
my_mt_progress->update_progress_from_thread(a_total_count_so_far);
clear_progress();
}
private:
multi_thread_progress_t* my_mt_progress = nullptr;
};
Conclusion
I’ve used this pattern to solve multi-thread problems multiple times. It served me well. Feel free to reuse this design where you need it!
The particular example for the progress report interface is found in the “utility” library of the Tantrix Solver project available on GitHub.