How to Handle Multiple Threads in C++: A Guide to Concurrency
Understanding Concurrency and Thread Communication
A concurrent code means that threads work simultaneously and to discuss that is the purpose of the following text. As discussed previously, in case a code include multiple threads there are implications to handle. A thread must be able to leave its processed data to a waiting thread without interference from other threads. If the data is shared the thread that wants access the data uses a mutex to block access from other threads wanting to retrieve the same data. As soon as the thread has obtained the data, the mutex is unlocked and other threads are free to acquire the data.
Using Condition Variables for Thread Synchronization
If many threads work simultaneously and only a few of them need the data from a specific thread a notification needs to be sent out to inform that the data is ready to acquire. The notification must also be correctly received by the receiving thread. To do that condition variables are used. The condition variable blocks the thread until a specific condition is met. Then the receiving thread wakes up as soon as the condition variable notifies the thread. Threads not meant to use the specific data are not to be notified. How this works will be discussed in the text and visualised in a code.
Implementing Multi-Threaded Calculations with Promises
In this example a thread is carrying out a calculation and then leaves the result to two threads in a vector and also replaces the result of the calculation with a number in a vector of data. The threads in the vector carries out one calculation each simultaneously. The vector could easily include as many threads as you like. Then there is a last thread receiving data from the two threads in the vector and also from the first thread. Two calculations are carried out in the last thread and the result from them are stored in two promises. This is example of a concurrent code with multiple threads working together. In the next section main is discussed and then the class called Thread_test is explained. All functions are described in the order they are lined up in the header file.
Main Function Breakdown: Futures and Shared Futures
In main there is a vector called vec_double with three numbers declared double. They are random numbers used in three out of four threads. There is also an integer equal to 2. This integer gives us the number of threads running simultaneously in the vector. The integer is also an argument to the constructor. The integer is used in the for-loop to create two promises and two futures. That is done in the function Prom_Fut. They will be used in the two threads running simultaneously. The two threads can be found in the vector called threads. The function countAdd is executed in the first thread called first. There are two numbers as arguments used in the calculation and they are coming from the vector vec_double. Numbers from the vector vec_double are also used in the calculation in the last thread where the function countDiv that is executed the in a thread called last. Then a vector called veck_int is filled with two numbers using iota starting with 0. Both vectors are used as arguments in the function for the two threads running simultaneously. Then an object of the class is created and as argument there is one number declared int. The integer gives the size of a vector of integers. The first function called prom_Fut contains promises and the second one contains shared_future. In this example there are two promises and two shared_future. Shared future is used to access the result from the threads promise and it is later used in more than one thread. The member function shared_futures is used because the same data is used in more than one thread.
In the function Prom_Fut() the promises are tied to the futures to use the value outside the threads or in another thread.
Then all threads are launched and joined. In the last function called print the two numbers from the last calculation in the thread called last executing the function called countDiv is printed to the screen.
C++ Code Implementation: Main.cpp
Below is the code from main included
#include <iostream>
#include <mutex>
#include <future>
#include <thread>
#include <vector>
#include <numeric>
#include "Thread_test.h"
using namespace std;
int main()
{
vector<double> vec_double{ 47.35, 2152.04,18.02};
cout << "This number in vector" << vec_double[2] << endl;
int threads_in_veck = 2;
vector<int> veck_int(threads_in_veck);
iota(begin(veck_int), end(veck_int), 0);
Thread_test thread_Obj(threads_in_veck);
thread_Obj.Prom_Fut();
thread first(&Thread_test::countAdd,&thread_Obj, ref(vec_double[0]),ref(vec_double[2]));
vec_double[2] = thread_Obj.countAdd_Vec();
cout << "This number in vector" << vec_double[2] << endl;
vector<thread>threads;
for (int i = 0; i < threads_in_veck; i++)
threads.push_back(thread(&Thread_test::countMulti, &thread_Obj, ref(vec_double[i]),ref(veck_int[i])));
first.join();
for (int j = 0; j < threads_in_veck; j++)
threads[j].join();
thread last(&Thread_test::countDiv, &thread_Obj,ref(vec_double[1]), ref(vec_double[2]));
last.join();
thread_Obj.print();
}
Deep Dive into the Thread_test Class Logic
Below is the cpp file
#include "Thread_test.h"
Thread_test::Thread_test( int n_threads): N_thread(n_threads){}
void Thread_test::Prom_Fut() {
prom_Mult_Vec.resize(N_thread);
fut_Mult_Vec.resize(N_thread);
for (int i=0; i < N_thread; i++)
fut_Mult_Vec[i]=prom_Mult_Vec[i].get_future();
}
void Thread_test::countAdd(double add_Nr_1,double add_Nr_2) {
mutex mut;
lock_guard<mutex> Addlock(mut);
count_Add = add_Nr_1 + add_Nr_2;
prom_Add.set_value(count_Add);
count_add_ready=true;
condit.notify_all();
}
double Thread_test::countAdd_Vec() const {
double number = fut_Add.get();
return number;
}
void Thread_test::countMulti(double number, int veck_int) {
mutex mut;
unique_lock<mutex> Multilock(mut);
condit.wait(Multilock, [this] {return count_add_ready; });
count_Mult = (fut_Add.get() * fut_Add.get())-number;
cout << " Answer " << count_Mult << " thread nr.= "<<veck_int<<endl;
prom_Mult_Vec[veck_int].set_value(count_Mult);
Multilock.unlock();
count_multi_ready = true;
condit.notify_one();
}
void Thread_test::countDiv(double number_1, double number_2) {
mutex mut;
unique_lock<mutex> Divlock(mut);
condit.wait(Divlock, [this] { return count_add_ready; });
condit.wait(Divlock, [this] {return count_multi_ready; });
for(int k=0; k<N_thread; k++)
count_Div_Div += number_2+ (number_1*fut_Add.get())/ fut_Mult_Vec[k].get();
count_Div_Add = number_1 + fut_Add.get();
prom_Mult_1.set_value(count_Div_Div);
prom_Mult_2.set_value(count_Div_Add);
Divlock.unlock();
}
void Thread_test::print() const { cout << "The first value in main is: " << futDiv_1.get() << "." << "\n" <<
"The second value in main is:" << futDiv_2.get() << endl;
}
An example of a build-up of a class where the threads operate together is given in this section. The functions are described in the order they are written.
1. Resizing Vectors for Parallel Execution
The first function is called prom_Fut and the function resize two vectors with promises and shared futures for the threads that are running parallel.
2. Implementing Mutex and Lock_Guard in count_Add
The second function is called count_Add and it is executed in a thread called first. The only thing that happens is that two numbers are added. The first number is the first number in the vector veck_double and the second number is the last number in the vector veck_double. Both numbers are also used in other threads. The operation is very simple but the rest of lines are related to concurrency and they are a bit complicated. In the first line in the function a mutex is created. Mutex stands for mutual exclusion and the purpose is to make sure that only this thread can access the data needed in the calculation. In the next line a lock_guard object Addlocks is launched with a mutex accepted in the constructor. The lock_gurad locks the mutex object on construction and release it on destruction at the end of the scope. After the lock_guard the rest is locked and cannot be accessed by another thread.
In the next line the calculation is carried out and the result of the calculation can be found in variable count_Add. In the next line the result of the calculation is stored in a promise object prom_Add. To store the result in the promise the function set_value is used.
Among the data members declared private there is a bool count_add_ready =false. The count_add_ready variable is now declared true. And in the last line there is a condition variable also declared as a private data member. One of the functions in the condition variable library is notify_all(). The function notify_all wakes up all threads wating for information. In this example there are three threads wating for the result.
3. Modifying Shared Data in Vectors
A thread can change the data in the vector and this is an example of that. The last value in the vector vec_double is changed.
4. Handling Unique_Lock and Wait Conditions in countMulti
It is time to investigate the fourth function called countMulti at the cpp file. The fourth function perform a very simple mathematical operation. But that is not the main point. The main point is how the function is built up. It will be executed in two threads simultaneously in a vector of threads. They will not access the data simultaneously. In the first line inside the function countMulti a mutex called mut is created and then a unique_lock called multilock with the mutex as an argument is created. The unique_lock is used when you need to modify shared resources. The thread has unique access to the data protected by the mutex. Other threads are blocked until the lock is released. The lock is automatically released when the lock goes out of scope or when it is released. In this case that takes place later at the line where the line of code Multilock.unlock() can be found.
In the next line we find a condition variable called condit.wait. The wait function checks the condition by the supplied lambda function. The variable declared bool in the lambda function declares true and the data can be recieved.
The thread knows that the it has single access to the data and the data from the previous thread is ready to be used. The calculation is carried out in the line below and the data is acquired using futures and promised. In the next line the result of the calculation is written to the screen. Then a promise prom_Mult and the function set_value() is used to store the answer to the calculation. In the next line you unlock the unique lock and in the following line the bool variable count_multi_ready is set to true. In the last line there is a condition variable with the member function notify one condit.notify_one(). It wakes up the next thread waiting for a condition variable to get ready.
The two threads ran simultaneously. Only one of the threads could access the data at a time. The data used in the calculation comes from the vector vec_double and from the vector veck_int and also from the first thread called first. The number from the second vector makes sure that every thread gets its own promise to store the value. The condition variable in the first thread wakes up the two threads ran parallel. There could be more threads ran parallel and if they did not have the condition variable condit then it would not have been woken up to receive data from the first thread. The size of the vectors must be at least as big as the number of threads there are to perform a task. If not, an exception must be cast. That is not handled in the code.
5. Managing Final Thread Operations in count_Div
The fifth function is called count_Div and it is executed in a thread called last. The function is structured in the same way as the previous threads found in the vector of threads with a few exceptions. The thread acquire data from all previous threads. Because of that there are two condition variables for the thread to be able to receive all necessary data. There are also two promises in the function to store data later written to the screen.
6. Retrieving and Printing Future Results
The sixth function prints the two number to the screen calculated in the last thread called last. The two numbers calculated can easily be retrieved by the help from two futures using the get function.
Complete C++ Header and Source Code
The class Thread_test
Below is the header file included
#pragma once
#include <iostream>
#include <mutex>
#include <future>
#include <thread>
#include <vector>
#include <condition_variable>
using namespace std;
class Thread_test
{
public:
Thread_test( int);
void Prom_Fut();
void countAdd(double, double);
double countAdd_Vec() const;
void countMulti(double, int);
void countDiv(double, double);
void print() const;
private:
int N_thread{ 0 };
promise<double> prom_Add, prom_Mult_1, prom_Mult_2;
vector<promise<double>> prom_Mult_Vec;
vector< shared_future<double>> fut_Mult_Vec;
double count_Div_Div{ 0 }, count_Div_Add{ 0 }, count_Mult{ 0 }, count_Add{ 0 };
shared_future<double> fut_Add = prom_Add.get_future();
shared_future<double> futDiv_1 = prom_Mult_1.get_future();
shared_future<double> futDiv_2 = prom_Mult_2.get_future();
condition_variable condit;
bool count_add_ready = false;
bool count_multi_ready = false;
};
Below is the cpp file included
#include "Thread_test.h"
Thread_test::Thread_test( int n_threads): N_thread(n_threads){}
void Thread_test::Prom_Fut() {
prom_Mult_Vec.resize(N_thread);
fut_Mult_Vec.resize(N_thread);
for (int i=0; i < N_thread; i++)
fut_Mult_Vec[i]=prom_Mult_Vec[i].get_future();
}
void Thread_test::countAdd(double add_Nr_1,double add_Nr_2) {
mutex mut;
lock_guard<mutex> Addlock(mut);
count_Add = add_Nr_1 + add_Nr_2;
prom_Add.set_value(count_Add);
count_add_ready=true;
condit.notify_all();
}
double Thread_test::countAdd_Vec() const {
double number = fut_Add.get();
return number;
}
void Thread_test::countMulti(double number, int veck_int) {
mutex mut;
unique_lock<mutex> Multilock(mut);
condit.wait(Multilock, [this] {return count_add_ready; });
count_Mult = (fut_Add.get() * fut_Add.get())-number;
cout << " Answer " << count_Mult << " thread nr.= "<<veck_int<<endl;
prom_Mult_Vec[veck_int].set_value(count_Mult);
Multilock.unlock();
count_multi_ready = true;
condit.notify_one();
}
void Thread_test::countDiv(double number_1, double number_2) {
mutex mut;
unique_lock<mutex> Divlock(mut);
condit.wait(Divlock, [this] { return count_add_ready; });
condit.wait(Divlock, [this] {return count_multi_ready; });
for(int k=0; k<N_thread; k++)
count_Div_Div += number_2+ (number_1*fut_Add.get())/ fut_Mult_Vec[k].get();
count_Div_Add = number_1 + fut_Add.get();
prom_Mult_1.set_value(count_Div_Div);
prom_Mult_2.set_value(count_Div_Add);
Divlock.unlock();
}
void Thread_test::print() const { cout << "The first value in main is: " << futDiv_1.get() << "." << "\n" << "The second value in main is:" << futDiv_2.get() << endl;
}
Final Thoughts on Thread Initialization and Scaling
The only number initialised in the constructor is the number of threads in the vector and there are two threads. This number is also used in the for-loop in the countDiv function. Numbers that are not data used in calculations are initialised in the constructor. The data used to perform calculations in the threads are arguments in the functions executed in the threads. In this code there are four threads and two of them are lined up in the vector. The size of the vector can be made much higher to contain a large number of threads. In this case the last thread receives data from both threads in the vector. That does not have to be the case. But if you want to let multiple futures threads receive data from some threads from the vector and not from others, a system will be needed. One particular thread receives data from every odd number thread in the vector could be an idea.