Cpp chronicle #1

Joinable and interruptible thread in C++20

What are some of the problems with the std::thread ?

The following program will cause core dump.

/* 
gcc version 10.0.1 20200314 (experimental) (GCC)
Ubuntu 18.04.5 LTS
x86_64 GNU/Linux 5.6.2
*/
#include <thread>
#include <iostream>

int main() {
    std::thread t{[]() {
        std::cout <<"Hello from thread t"<<std::endl;
    }};
}

core_dump

The reason for the core dump is std::terminate() called by the destructor of the std::thread object t.

// from std::thread header
~thread()
{
    if (joinable())
        std::terminate();
}

The default terminate handler calls abort. More on this link.

Therefore the above code can be fixed by calling join() or detach() on the std::thread object t.

#include <thread>
#include <iostream>

int main() {
    std::thread t{[]() {
        std::cout <<"Hello from thread t"<<std::endl;
    }};

    t.join();
    std::cout << (t.joinable()?"joinable":"joined already")
        <<std::endl;
}
/* output:
Hello from thread t
joined already
*/

Thats looks fine. But there is still a problem. The problem is due to possible uncaught exceptions that might occur before t.join(). For example,

#include <thread>
#include <iostream>

int main() {
    std::thread t{[]() {
        std::cout <<"Hello from thread t"<<std::endl;
    }};

    /*this will throw an std::out_of_range*/
    std::string("abc").at(5); 

    t.join();
    std::cout << (t.joinable()?"joinable":"joined already")
        <<std::endl;
}
/* output:
terminate called without an active exception
[1]    13976 abort (core dumped)  ./a.out
*/

In the above example, the generated exception is not caught which will cause std::terminate. We can also fix this with try catch block.

#include <thread>
#include <iostream>

int main() {
    std::thread t{[]() {
        std::cout <<"Hello from thread t"<<std::endl;
    }};

    try {

        /* do other things and it might cause exception */

        std::string("abc").at(5);
    
    }catch(...) { /*std::out_of_range is not a std::exception */
        auto ePtr = std::current_exception();
        try {
            if (ePtr)
                std::rethrow_exception(ePtr);

        } catch(const std::exception& e) {
            std::cout << std::endl
                << "Caught exception \""<< std::endl << e.what() << "\"\n";
        }
    }

    t.join();
    std::cout << (t.joinable()?"joinable":"joined already")
        <<std::endl;
}

/*
Hello from thread t

Caught exception "basic_string::at: __n (which is 5) >= this->size() (which is 3)"
joined already
*/

Therefore, we must need to take care of a proper join() in the case of std::thread.


There is also a basic requirement missing from std::thread class, which is interruption capability. The std::thread does not provide a direct method of stopping or interrupting a running thread. A programmer has to create his/her own signaling mechanism (for example using std::future/std::promise) to achieve thread interruption.

std::jthread

The std::jthread (C++20) solves the problem of std::terminate and calls join() on ~jthread(). We dont need to worry about the joining of a std::jthread. Following is a basic example using std::jthread.

#include <thread>
#include <iostream>

int main() {
    std::jthread t{[]() {
        std::cout <<"Hello from thread t"<<std::endl;
    }};

    std::cout << (t.joinable()?"joinable":"joined already")
        <<std::endl;
}
/*
from std::jthread class

~jthread()
{
    if (joinable())
    {
        request_stop(); // (?) discussed below
        join();
    }
}

output:
Hello from thread t
joinable

Or

joinable
Hello from thread t
*/

As we can see, in the destructor of a std::jthread, it tries to join() if still joinable. We can also notice a call to request_stop(). This request_stop() is a new concept introduced in C++20 for co-operative cancellation of any threads (std::thread or std::jthread). In fact this is a new co-operative cancellation mechanism and is independet feature from std::thread. We will see how this feature is embedded in std::jthread class.

Basically, in C++20, we have new header called stop_token. It provides three entities:

  1. stop_source 2.stop_token
  2. stop_callback

In brief:

  1. stop_source : used to create stop request
  2. stop_token : client side of the stop_source, which can see the stop request
  3. stop_callback: callback registration feature, registered callbacks are invoked on stop_request

Let us see, how to use these three in the following example.

Example1.
#include <mutex>
#include <chrono>
#include <thread>
#include <iostream>
#include <stop_token>

int main() {

    /*we create a stop_source first
    * this provides the means to issue a stop request to a running entity.
    * this stop_cource object can create tokens.
    */
    std::stop_source stSrc;

    /* We take a stop_token from the created stop_source.
    * we can pass (by value) this token to thread or jthread
    */
    auto token = stSrc.get_token();

    /*this std::thread taking int and a stop_token*/
    std::thread t{[](int x,std::stop_token token) {
        bool done = false;
        while(!done) {
            std::cout <<"["<<x<<"] Hello from inside of the std::thread "<<std::endl;
            /* stop_token has a method called stop_requested()
            * it returns true when request_stop() is called on its associated
            * stop_cource object.
            * It means, whenever request_stop() is called a stop_cource object
            * this request becomes visible to all the tokens taken from the same 
            * stop_source object.
            */
            if(token.stop_requested()) {
                std::cout <<"This thread work is done!" << std::endl;
                done = true;
            }
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }

    },10,token}; /*pass the token to the thread function (or client)*/


    // lets ask to stop the thread t
    std::this_thread::sleep_for(std::chrono::seconds(2));
    /* as explained above, we are going to call request_stop() on the
    * source object. The std::thread t can detect this request and 
    * complete its execution.
    */
    stSrc.request_stop();
    std::cout <<"stop requested from main" << std::endl;

    t.join();
}

This is co-operative cancellation, because thread t need not necessarily check stop_requested(), or even if it does take actions on stop_request(), the actions are completely upto thread t’s wish. In the above example, we are just returning from the function, we could have done something else.

We can pass token to any number of threads/jthreads and call request_stop() on the source object. A stop_token follows value semantics, it means, we can simply pass the token by value and still send stop_request to the copied token. The move semantics are also enabled for stop_token, therefore if we call std::move(token) before passing the token to a thread, the thread will not receive any stop_request called on the associated stop_source object. For example,

int main() {
    std::stop_source stSrc;
    auto token = stSrc.get_token();
    auto token2 = std::move(token); // moved
    
    std::thread t{[](int x,std::stop_token token) {
        bool done = false;
        while(!done) {
            std::cout <<"["<<x<<"] Hello from inside of the std::thread "<<std::endl;
            if(token.stop_requested()) { /*checking or polling*/
                std::cout <<"This thread work is done!" << std::endl;
                done = true;
            }
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }

    },10,token}; /*token is a moved object now*/

    // lets ask to stop the thread t
    std::this_thread::sleep_for(std::chrono::seconds(2));
    stSrc.request_stop();
    std::cout <<"stop requested from main" << std::endl;
    
    
    t.join(); /* we are using thread for now */
}

The above program will print Hello from inside of the std::thread infinitely.

All the copy and move operations are enabled for std::stop_token class.

// from stop_token header
/*copy constructor*/
stop_token(const stop_token&) noexcept = default;
/*move constructor*/
stop_token(stop_token&&) noexcept = default;
/*copy assignment*/
stop_token& operator=(const stop_token&) noexcept = default;
/*move assginment*/
stop_token& operator=(stop_token&&) noexcept = default;

Before going back to std::jthread, lets discuss about the stop_callback feature also.

We can register callback function on a stop_token. When request_stop() is called on the associated stop_source object, all the callbacks registered on associated tokens of this source will be invoked. For example,

#include <thread>
#include <iostream>
#include <stop_token>

#define Debug(arg) std::cout <<"Tid[" \
    <<std::this_thread::get_id()<<"]" \
    <<" "<<arg <<std::endl;

int main() {
    std::stop_source stSrc;
    auto token = stSrc.get_token();
    
    std::stop_callback cb1{token,[&]() {
        Debug("Callback-1");
    }};

    std::stop_callback cb2{token,[&]() {
        Debug("Callback-2");
    }};

    stSrc.request_stop();

    Debug("Stop requested from main");
}
/* output:
Tid[140537881631744] Callback-2
Tid[140537881631744] Callback-1
Tid[140537881631744] Stop requested from main
*/

The registred callback functions can run either in the context where request_stop() is called or in the context where callback constructor is called. The second case happens when request_stop() is called even before the callback constructor is called.

In the above example, we only have a single thread context, which is the main thread, therefore the output shows only one Thread Id value.

Lets take few other examples of the use cases of stop_token and stop_callback.

Example2:
#include <chrono>
#include <thread>
#include <future>
#include <iostream>
#include <stop_token>
#include <condition_variable>
#define Debug(arg) std::cout <<"Tid[" \
    <<std::this_thread::get_id()<<"]" \
    <<" "<<arg <<std::endl;


int main() {
    std::stop_source stSrc;
    auto token = stSrc.get_token();

    std::thread t{[](int x,std::stop_token token) {
        std::promise<void> p;        
        auto fu = p.get_future();

        std::stop_callback cb{token,[&]() {
            // runs in the main thread context
            Debug("Promise Value Set");
            p.set_value();
        }};
        
        Debug("Sleeping...");
        /*this thread will wait on the future received from the promise
        * main function will call request_stop after 2 sec
        * therefore callback function will also run after 2 sec
        * promise will set the value
        * the wait on the future will end and this thread will wake up after 2 sec
        */
        fu.wait_for(std::chrono::seconds(10));
        Debug("Woke up...");
        if(token.stop_requested()) {
            Debug("Returning...");
            return;
        }

    },10,token};

    // lets ask to stop the thread t
    std::this_thread::sleep_for(std::chrono::seconds(2));
    stSrc.request_stop();

    Debug("Stop requested from main");
    
    t.join();
}

In the above example, the thead is stopped in a event driven manner, instead of a polling mechanism shown in Example1.

More examples: The following example shows how we can pass the token around and stop multiple threads.

Example3:
#include <chrono>
#include <thread>
#include <iostream>
#include <stop_token>
#include <future>
#define Debug(arg) std::cout <<"Tid[" \
    <<std::this_thread::get_id()<<"]" \
    <<" "<<arg <<std::endl;

int main() {
    std::stop_source stSrc;
    auto token = stSrc.get_token();

    std::thread t{[](int x,std::stop_token token) {
        std::promise<void> p;        
        auto fu = p.get_future();

        std::stop_callback cb{token,[&]() {
            // either runs in the main's context or this thread context
            Debug("Promise Value Set");
            p.set_value();
        }};
        
        Debug("Sleeping...");

        fu.wait_for(std::chrono::seconds(10));
        
        if(token.stop_requested()) {
            return;
        }

    },10,token};

    std::thread t1{[](int x,std::stop_token token) {
        std::promise<void> p;        
        auto fu = p.get_future();

        std::stop_callback cb{token,[&]() {
            // either runs in the main's context or this thread context
            Debug("Promise Value Set");
            p.set_value();
        }};
        
        Debug("Sleeping...");

        fu.wait_for(std::chrono::seconds(10));
        
        if(token.stop_requested()) {
            return;
        }

    },10,token};

    // lets ask to stop the thread t
    std::this_thread::sleep_for(std::chrono::seconds(2));
    stSrc.request_stop();

    Debug("Stop requested from main");
    
    t.join();
    t1.join();
}
/*output:
Tid[139898110605056] Sleeping...
Tid[139898102212352] Sleeping...
Tid[139898198559872] Promise Value Set
Tid[139898198559872] Promise Value Set
Tid[139898198559872] Stop requested from main
*/

In C++20 a new set of wait apis has been added to std::condition_variable_any class which can respond to stop_tokens. In the following example, wait_for() api is used on std::condition_variable_any. This program has the similar effect as that of the previous programs.

Example4.
#include <mutex>
#include <chrono>
#include <thread>
#include <iostream>
#include <stop_token>
#include <condition_variable>

#define Debug(arg) std::cout <<"Tid[" \
    <<std::this_thread::get_id()<<"]" \
    <<" "<<arg <<std::endl;

int main() {
    std::stop_source stSrc;
    auto token = stSrc.get_token();
    std::thread t{[](int x,std::stop_token token) {
        std::mutex m;
        std::condition_variable_any cv;
        std::unique_lock lock{m};
        Debug("Sleeping...");
        
        /*a new wait_for() api which can respond to stop_token
        * Therefore, this thread will sleep for either
        * 10 seconds or untill stop_request been received. 
        */
        cv.wait_for(lock,token,std::chrono::seconds(10),[]{
            return false;
        });
        
        Debug("Returning ...");

    },10,token};
    
    
    // lets ask to stop the thread t
    std::this_thread::sleep_for(std::chrono::seconds(2));
    stSrc.request_stop();

    Debug("Stop requested from main");    
    t.join();
}

Lets talk about the std::jthread again. As we can see, in all of the above examples on stop_token, only std::thread has been used. Therefore we have to explicitely create a stop_source and extract tokens out of it and pass them to thread functions.

In the case of std::jthread, we dont need (necessarily) to create explicit stop_source. the std::jthread class has a member stop_source object. The std::jthread class also has the request_stop() api.

#include <chrono>
#include <thread>
#include <iostream>
#include <stop_token>

int main() {
    /*An co-operative interruptible thread*/
    std::jthread t{[](std::stop_token token,int x) {
        bool done = false;
        while(!done) {
            std::cout <<"["<<x<<"] Hello from inside of the std::thread "<<std::endl;
            
            if(token.stop_requested()) {
                std::cout <<"This thread work is done!" << std::endl;
                done = true;
            }
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }

    },10};
 
   /* we dont need to pass the token, jthread constructor
    * will take care of it.
    * We can omit the token params also, if we dont want the thread function
    * to be interruptible
    */
    
    /*In the following thread function, token argument is skipped 
    * Reason being, t2 need not be interrupted from the application point of view
    */
    
    std::jthread t2{[](int x) {
        bool done = false;
        while(!done) {
            std::cout <<"["<<x<<"] Hello from inside of the std::thread2 "<<std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }

    },10};

    t.request_stop();
}
/*output:
[10] Hello from inside of the std::thread 
[10] Hello from inside of the std::thread2 
[10] Hello from inside of the std::thread 
This thread work is done!
[10] Hello from inside of the std::thread2 
[10] Hello from inside of the std::thread2 
[10] Hello from inside of the std::thread2 
[10] Hello from inside of the std::thread2
....
*/

In the above example, thead1 t will ebe stopped due to stop_token injection into the thread function whereas in the second thread t2 does not stop.

Therefore std::jthread class is not only capable of auto join but also co-operative interruption enabled.

c++  thread 

See also