Structured Concurrency

This article is a follow-up to the "Getting Rid of State Machines" blog posts, so you may want to read those first:

Before going on, let me return to the fundamentals and make it clear what this effort is all about.

As Joe Armstrong nicely puts it in his blog:

TCP uses the idea a session and the only rational way to program a session is as a process or (horrors) as a thread. […] Session management involves mutable state concurrency. A program that is a dozen lines of Erlang escalates into a mess of locks and mutexes or callbacks which in most languages is a thin layer over a pthreads implementation. For those of you who haven’t written a multi-threaded TCP socket server in C using pthreads I can only say “don’t go there, it’s not a pleasant experience” I’ve been there done that, and have the grey hairs to prove it.

What's more to say? I have those grey hairs as well.

If you are not familiar with the problem, imagine you want to implement a simple framing protocol. It will work on top of TCP. The protocol will be a simple sequence of messages, each message consisting of 32-bit size field in network byte order and the data. How hard it can be to implement such a simple specification?

It turns out it can take months. Most of the time will be spent on dealing with switching between concurrently opened TCP connections, on error handling and on managing the shutdown process.

In a sane world, specification 30 words long, like the one above, would be implementable in 100 lines of C code. If it requires several thousand lines of code, there's clearly something broken with our programming model.

As I've argued in part I of this essay, to get a simple programming model you have to spawn a thread per TCP connection. But given that threads are expensive (and processes even more so) and that sharing state between threads is insane, bordering on suicidal, what you need are lightweight green threads and a simple communication mechanism to send messages between them. In UNIX of old they were called processes and pipes. In Go they are called goroutines and channels. In Erlang they are known as processes and mailboxes.

However, as explained in part II of this essay, even with CSP-like mechanism to use, there's one broad set of use cases that cannot be easily solved. The set contains everything that has to do with shutdown and cancellation. The article also makes a claim that if language had green threads *and* a sane cancellation mechanism it would allow for a purely imperative programming style without a need for state machines, i.e. explicitly saving state of the current computation, switching to something else, then restoring the state later on.

The rest of the article will explore the question of cancellation and will propose a model of "structured concurrency" to deal with it.

Structured concurrency is similar to the structured programming as we know it from all the modern programming languages. Structured programming prevents random jumping around the codebase and instead structures the program as a set of nested code blocks. The block lifetimes never overlap. If block B is nested in block A, the lifetime of B cannot exceed the lifetime of A. Similarly, structured concurrency prevents lifetime of green thread B launched by green thread A to exceed lifetime of A:

Green thread quux() is launched by main(). It also finishes before the main(). Same applies to foo() and main() as well as to bar() and foo().

Codewise, it could look like this:

void foo(void) {
   ...        // do stuff
   go(bar()); // launch new green thread
   ...        // do more stuff
}             // bar() is not running here any more

You can think about it as extending our current notion of call stack to embrace concurrency. The call stack morphs into a call tree, but the nice properties of not unwinding the stack before children are finished and being able to walk up the stack all the way up to the main() function are preserved.

A nice way to think about it is that green threads (like bar() in the example above) are just like local variables. Just like with variables, when the scope closes the green thread exists no more.

Green threads local to functions may not be the most common use case but they are extremely helpful when thinking about the problem. The conceptual advantage of this system is obvious: Unlike with goroutines in Go, the green thread bar() is fully local to foo(). It's encapsulated within it. The caller of foo() doesn't have to be aware of the existence of bar(). The implementor of foo(), on the other hand, can remove bar() or launch 100 instances of it and it won't affect the caller in the slightest.

But how would such system behave? What happens if bar() is still running when foo() returns to the caller?

Do we want foo() to block and wait for bar() to finish? Or do we want foo() to forcefully cancel bar() before it exits?

In real-world case we want a mixture of both, but the trivial case is the latter: Imagine that bar() is some kind of generator running in an infinite loop. If foo() waited for it to finish, it would be stuck forever.

Let's think about how to do the cancellation then. The good news is that because of cooperative scheduling of green threads the code already has to be split in reasonably sized chunks. If it was not, one green thread would be able to block other greent threads for extended period of time. And given that the chunks never take long to execute we don't have to cancel the thread at random point of execution. We are perfectly happy with finishing the currently running chunk and cancelling the thread once it's done.

In terms of API it means that unlike with POSIX signals where signal can happen at the most inconvenient point of time, we have to care about cancellation only when calling functions that can cause switch to a different green thread, in other words, all blocking functions.

If we repurposed error code ECANCELED to signal that parent function is cancelling the current green thread the code could look like this:

ssize_t rc = tcprecv(s, buf, len, 0);
if(rc < 0 && errno == ECANCELED)
    return;

One observation to make here is that even with preemptive scheduling when the code is not guaranteed to be structured in bite-sized pieces it mostly happens to be. For example, as I am said, rumpkernels are exposing POSIX API including pthreads but are actually running the program in a single thread using system calls as switching points. And so far, it seems to work even for unmodified legacy applications.

The other part of the problem is harder though: How to initiate the cancellation from the parent function? We are facing quite a lot of distinct use cases here and the API is hard to get right.

What follows will be a bit C-specific. In fact, the C implementation already exists and you can play with it here.

Using C has both advantages and disadvantages. On the positive side, C's "you can do whatever you want but you have to do it by hand" ideology means that we don't have to extend the language itself and that we'll be able to accomplish everything we want by implementing a function. On the other hand, the fact that user has to do cancellation manually obscures the fact that in higher level languages the mechanism would be incorporated into the language. To tackle the problem I'll briefly discuss other languages at the end of this article.

So, here we go.

First of all, if we want to cancel a green thread, we need a thread handle:

coro hndl = go(bar());

What that means is that the green thread cannot be fully deallocated when it finishes its execution. If it was, the caller would be left with invalid handle and would face problems if they tried to cancel the thread. This is in contrast to how Go deals with goroutines. Goroutines run till they are finished and then they cleanly deallocate themselves.

The question thus is: Are we missing an important use case by requiring green threads to be explicitly canceled?

As far as I can say, no. Goroutine cancellation in Go is typically performed by its owner using signaling via channels and is semantically equivalent to built-in cancellation as proposed here, except more cumbersome. Some goroutines may be left running in a infinite loop just to get canceled when the process exits. These are equivalent to green threads launched in global scope. It's really hard to think of a fitting use case. The key observation is that if goroutine's lifetime is not synchronised with lifetime of a different goroutine it has no one to speak to (it doesn't even know who's running and who's not) and is thus quite useless. In short, the behaviour of Go-style green threads seems to be an artifact of Go's design and not an expression of a concrete real-world use case.

Let's thus introduce the simplest possible API to cancel green threads. We are going iterate on it to get something that may actually work in the real world.

void foo(void) {
    coro cr = go(bar());
    ... // do stuff
    gocancel(cr);
}

What's missing is a way to give bar() a grace period to shut down. If, for example, it was sending its arguments to the network we may want to give it a second to flush the data before exiting:

void foo(void) {
    coro cr = go(bar());
    ... // do stuff
    sleep(1);
    gocancel(cr);
}

The solution above works but the problem is that foo() is blocked for one second whether bar() manages to exit immediately or not. We would rather want it to finish immediately if possible but allow for the grace period otherwise.

It seems that two signals have to be sent to the green thread being canceled: First one saying: "Cancel if you have nothing to do anyway". Second one saying: "Cancel even if you are not yet finished."

If one thinks about how the bar() would look like it's not nice. Having to deal with the first signal even it the cases where you don't want to exit straight away results in re-introduction of state machines and exactly in that kind of convoluted code that we want to avoid.

I've been stuck with this problem for quite a while but in the end it turns out it's not a problem at all.

Consider how a typical implementation of bar looks like:

void bar(sock s, chan ch) {
    while(1) {
        int val;
        size_t valsz = sizeof(val);
        chrecv(ch, &val, &valsz);       // get a message fro the owner
        tcpsend(s, &val, sizeof(val));  // send it via TCP
    }
}

The thing is that if the coroutine is either stuck when communicating with its owner (receive from channel) or when doing other work (sending data to TCP connection in this case). In the former case we want it to terminate immediately — owner is shutting it down so there's no message going to arrive anyway. In the later case we want a grace period before the cancellation happens.

And it turns out that we actually do have two different communication mechanisms between the owner and the owned! One is the channel ch used to send messages to the worker thread. Other one is the special cancelation mechanism we are discussing in this article.

Let's introduce function chdone() equivalent to Go's close() function. It will cause any subsequent operation on the channel return EPIPE error.

Let's also add timeout parameter to our gocancel() function. The cancellation of the child thread will begin after the timeout expires. If, on the other hand, child thread exits before the timeout expires, gocancel() will return straight away.

void bar(sock s, chan ch) {
    while(1) {
        int rc;
        int val;
        size_t valsz = sizeof(val);
        rc = chrecv(ch, &val, &valsz);
        if(rc == -1 && errno == EPIPE)
            return;
        rc = tcpsend(s, &val, sizeof(val));
        if(rc == -1 && errno == ECANCELED)
            return;
    }
}

void foo(void) {
    coro hndl = go(bar(ch));
    ... // do stuff
    chdone(ch);
    gocancel(hndl, now() + 1000);
}

Let's see how it works.

If bar() is stuck in chrecv(), in other words, if it is idling, the chrecv() function will return EPIPE error immediately when owner does chdone(). bar() thus exists immediately.

If bar() is stuck in tcpsend(), i.e. if it is doing useful work, it won't be affected by chdone(). Afterwards, parent's gocancel() will wait for 1000 milliseconds. During that grace period, nothing will happen in bar(). tcpsend() will continue to send data. If it manages to send all the data, bar() will procceed to chrecv() which will immediately return EPIPE error (chdone() was already called by the parent).

If tcpsend() doesn't succeed to send all data within the grace period, gocancel() will cancel the thread meaning that tcpsend() will return ECANCELED error and bar() will exit.

Let's consider the most complex possible case.

First, imagine that main() launches foo() which in turn launches bar(). After a while, foo() starts canceling bar() and gives it a grace period of 1000 milliseconds.

That's simple. Let's now complicate the scenario: Imagine that main() starts canceling foo() while foo() is blocked in gocancel() and gives it grace period of 500 milliseconds.

In other words, that grace period given by main() to foo() expires before grace period given by foo() to bar(). How do we even want the system behave in such a case?

Well, we want foo() to be canceled after 500 milliseconds even though its child has longer grace period. If it was not so, the encapsulation would not work: Recall that we want main() to be fully agnostic about whether foo() uses bar() or not.

At the same time, we want bar() to finish before foo() because otherwise the lifetime of the child would exceed the lifetime of the parent.

The only possible solution is to shorten the grace period of bar() so that it expires when foo() is about to finish.

It can be seen in the picture below. When gocancel() invoked by foo() gets cancellation signal from main() it revokes the remaining part of the grace period (grey box) and sends cancellation signal to bar() straigt away. bar() thus finishes almost immediately. gocancel() in foo() is free to exit. foo() finishes immediately afterwards and causes gocancel() in main() to exit. From the perspective of main(), foo() was canceled in 500 milliseconds, as expected.

API-wise, it's easy to implement. gocancel() itself is a blocking function and thus should return ECANCELED if the parent decides to cancel current green thread. And that's exactly what it's going to do. The only thing to keep in mind is that gocancel() *does* cancel the thread even if it returns ECANCELED error. It never leaves the cancellation in a half-done state even though it may shorten the grace period.

It is also worth mentioning that shortened grace period doesn't break foo()'s expectations. gocancel() it calls with 1000ms deadline can return after 500ms anyway if bar() finishes its work successfully. The case where it returns early because main() imposed shorter grace period on foo() therefore doesn't have to be handled in any special way.

There's one more challenge for the gocancel() function: If we have 1000 child threads and give each of the 30 second grace period, the shutdown would take 30,000 seconds, i.e. more than 8 hours.

We want grace periods for all those threads to happen in parallel:

void foo(void) {
    coro hndls[1000];
    for(int i = 0; i != 1000; ++i)
        hndls[i] = go(bar());
    ... // do stuff
    gocancel(hndls, 1000, now() + 30000);
}

By doing all the cancellation in parallel, function foo() will be delayed by at most 30 seconds.

We are almost done, but I would like to return to the assumption I've made at the beginning of this article. I have said that green threads are arranged in a tree-shaped super-stack. I've also said that this may not even be the most common use case. However, we stuck to it because it made reasoning about the system relatively easy.

Let's not have a look at other cases. Sometimes, lifespan of a green thread is not contained within a lifetime of a parent thread but rather its ownership is shared among multiple threads. What then?

Well, I've already said that green thread should behave like a variable which is eliminated once it gets out of scope. However, we can allocate a variable on heap! In such case the lifetime of the variable in not tied to any particular scope but is terminated by hand (free() function). We can do the same with green threads.

Imagine a object that sends data to a TCP socket. We can open such object using open_connection() function and close it using close_connection() function. It can contain a green thread that does the sending:

struct connection {
    coro hndl;
    int sock;
}

struct connection *open_connection(int sock) {
    struct connection *conn = malloc(sizeof(struct connection));
    conn->sock = sock;
    conn->hndl = go(connection_worker(conn));
    return conn;
}

void close_connection(struct connection *conn) {
    gocancel(&conn->hndl, 1, now() + 1000);
    free(conn);
}

That's it. The lifetime of the green thread is now managed manually by the owner(s) of the connection object.

Finally, I would like to say few words about implementing structured concurrency in higher level languages. These typically differ from C by managing lifetimes of individual objects on user's behalf. C++ does so using automatic destructors, most other languages by having garbage collection mechanism. Given that structured concurrency is about lifetime management of green threads in those languages it should become part of the language.

The most important question here is whether the cancellation can be done automatically, without any syntactic dead weight.

The eqivalent of gocancel() function can surely be invoked automatically when the instance of green thread gets out of scope.

One interesting observation here is that it can't be implemented using C++ destructors (or its equivalents in other languages) because that would cancel all the child threads one after another, in sequential manner, instead of cancelling them in parallel. The mechanism has to be trully integrated into the language runtime.

What about deadlines though? gocancel() has a deadline parameter that can't be supplied by user if the cancellation is invoked automatically.

My intuitive take on that is (but I may be wrong) that the size of grace period should be decided by the implementor of the thread who has the best idea about how long it makes sense to wait before killing it, and therefore it should be part of the green thread definition, e.g.:

coroutine void foo(void) deadline(100) {
    ...
}

One may reason that the deadline could vary depending on, say, CPU speed and thus should be instead set by the caller. However, if you look at the actual use cases it turns out that grace period is typically needed for interactions with the outside world, such as network I/O, user input or similar. Those operations, in turn, are independent of CPU speed.

In the languages with exceptions ECANCELED error can be turned into an exception. One pitfall to avoid is that cancellation itself can be cancelled. In such case, the runtime should not try to raise an exception within an exception but rather propagate the exception up the stack. Once it unwinds main function of the thread it should be silently dropped.

Finally, integration with garbage collected languages may be tricky. Surely we don't want green thread, after it got out of scope, to be left running, waiting for the garbage collector to kill it. Once again, this looks like a feature that has to be integrated into the language runtime.

As a conclusion, I would like to say that this is a problem I was struggling with for years and this is the first time I feel I have a reasonable solution to it. Any feedback, criticism or simply ideas would be appreciated.

February 7th, 2016