Data flow variables and streams
Introduction
Dataflow computational model is a relatively well known concept (see Dataflow and related articles). You can find an excellent extended explanation and discussion in Concepts, Techniques, and Models of Computer Programming and other text books. With the advent of concurrent and distributed programming dataflow programming grows in popularity because it seems to be the only computational model which can scale well not only up (which most people currently pay attention to), but also down and everywhere in between. Some properties of this model like single variable assignment allow to completely avoid the most difficult challanges presented by other computational models in distributed environment.
Basics
Imagine we want to compute some numeric values like 6 + 4 - 3 → 7 or 2 + 3 - 1 → 4. It's easy to see a pattern here. We can capture this pattern in a form of expression or program like Z = A + B - C. Here we say that A,B and C are input variables or arguments and Z is an output variable or result. Once we determined that, we can now perform any number of computations for any different sets of input variables. We can do that by evaluating or executing the expression for each given set of input variables. This is a classic case of program vs. data separation (i.e. expresion is the program and values are the data).
Note that this is not the only way how we can compute Z. We can also do it in two steps: T1: Y = A + B and T2: Z = Y - C. Now we have two expressions to evaluate, but each expression is simpler than the original one. Using decomposition like this we can reduce any expression of an arbitrary complexity into a set of very simple ones. This is an example of equivalent program transformation and is a basis of existance of all modern programming language compilers.
The next thing to note is that these sub-expressions must be evaluated in certain order. Indeed, since value of Y is not known before T1 is evaluated, we cannot evaluate T2 because Y happens to be an input variable of T2. In fact, we can say the same about A,B and C. That is, we can evaluate T1 only when A and B are known and we can evaluate T2 only when Y and C are known. The fact that Y comes as a result of another sub-expression changes nothing.
We can depict this as the following dependency graph:
here variables are depicted as circles and expressions as rectangles. We will also call an expression evaluation or any other undivided unit of work a task. Concave half-circles represent task inputs and convex half-circles represent outputs.
Now let's summarize what we've just learned:
- Variables can be unbound (i.e. don't have any values associated with them) or bound (i.e. have such values). This is a big departure from regular programming when variables always have some [default] values. This is due to the fact that in "normal" programming variables are associated with memory locations (i.e. they are essentially symbolic memory addresses) and memory always has some content. The content can be wrong or random [garbage], but it always exists nonetheless. Here variables are associated with edges of a dependency graph.
- Once a variable is bound, it can be used as input in dependent computations. Until then, those computations must wait. This is a very nice quality as it prevents our program from executing something prematurely and getting an exception or, worse, processing garbage data.
- Variables can also be "equal" to each other (e.g. A = B). This is a mere convenience. In this case variables simply point to each other and once one of them becomes bound to a value, all its "equals" automatically become bound to the same value.
- The fact that variable becomes bound to its value can be seen as a signal to all awaitng dependent computations and therefore represents a synchronization mechanizm between otherwise concurrent computations. In fact, a very nice one. Everything is automatic: no dealing with execution threads, explicit syncronization primitives, deadlocks etc. Compiler and dataflow run-time can do all the [synchronization] work for us. It also does not matter if different tasks are situated on different computers or defined in any odd order in our source code. The system will use the dataflow dependencies to execute all tasks in correct order.
- It also does not matter in which order our input variables are bound. Any bounding order will produce exactly same result. This automatically takes care of any possible [communication] delays or any other similar issues which are inherent to distributed systems with generally unreliable and/or slow communications.
- Task is an atomic unit of work. It starts when all its inputs become bound and, as a result, may bind one or more output variables. This provides a very clean separation of concerns between dataflow specifics and actual work which needs to be done. The content of a task needs not be concerned about any data delivery and syncronization between the tasks. The simple fact that task was chosen for execution does guarantee that all necessary conditions were met. This provides a great deal of flexibility in how tasks can be implemented. All and any existing programming technologies can be used without any concerns. All we need to do is to place dataflow variables' values into some [temporary] memory locations before the task execution so any regular programming technology used to implement the task may access those values as regular [memory] variables
- A collection of possibly interdependent tasks working toward a common goal is called a workflow.
- However, there is also a new and very unusual thing to deal with: each variable can be bound to a value only once. That is, if you have bound variable A to value 6, you cannot re-bind it later to 2 or any other value. It will remain bound to 6 for the rest of the program execution. This is very different from most other programming paradigm and it's both a "blessing" and a "curse":
- It's a "blessing" because it is the only thing which allows to achieve overall consistency in a distributed system. Mutable state is the exact reason why OOP, actors or any other paradigm based on mutable state is doomed to fail in a distributed environment. With mutable individual component states there is no any reliable way to reason about overall state of the distributed system. In other words, there is no such thing as a distributed object. The best one can do in OOP is to make two or more objects to "talk" to each other across the network, but in this case synchronization and consistency enforcement complexities tend to grow exponentially which drastically limits scalability and performance. Things like dynamic incremental system updates (when new functionality is introduced into a running system), migration of load and functionality between the nodes etc. on a large scale remains well beyond capabilities of OOP-based designs. Dataflow, on the other hand, can achieve all this rather easily.
- It's a "curse" because it renders pretty much all we know about programming so far almost useless. Even simple questions like: how do I program a loop with dataflow variables? don't have obvious answers as, obviously, dataflow dependency graph cannot have cycles.
Dataflow streams
The absence of obvious or rather familiar answers does not mean the absence of answers at all . In fact, all the seeming problems with variable immutability can be easily resolved with a concept of a dataflow stream.
Let's look again how our workflow is executed. The one and only prerequisite for execution of any task is that all of its input variables must be bound to some values, but in order to computations be meaningful, those values cannot be considered independent from each other in a sense that they must form sets of correspondent values. That is, for the first computation we have a set of A = 6, B = 4 and C = 3 and for the second one A = 2, B = 3 and C = 1. Now we can think of the computation serial number as of some sort of a global clock tick number. Each tick of this global clock increments computation number and corresponds to another set of input values.
Another way to look at this is to imagine that each mutable value is represented as a sequence of its concequent values or stream of values. Each stream element has a serial number associated with it which, in turn, corresponds to the global clock tick number. We can represent this in a table
Tick# | A | B | C |
---|---|---|---|
1 | 6 | 4 | 3 |
2 | 2 | 3 | 1 |
Here each row represents a set of values necessary to start a correspondent computation (in a series of computations of the same expressions, but with different input data) and each variable column represents its variable stream (i.e. a sequence of consecutive values this variable needs to be bound to for each computation).
As new sets of values will continue to arrive so will new rows be added to the table, each with next serial [tick] number. It is important to understand that the duration of the tick itself is not important at all. For example, all new values may come in within the next millesecond or A will arrive in the next minute and the other two in the next hour, but the tick (i.e. table row) will not be completed (and the correspondent computation will not start) until all of them are finally here.
Note that [in a distributed system] it is very possible that some values may arrve ahead of the others so we might have a situation like this
Tick# | A | B | C |
---|---|---|---|
1 | 6 | 4 | 3 |
2 | 2 | 3 | 1 |
3 | 4 | 7 | |
4 | 6 | ||
5 | 9 |
where we have values of B all the way to tick #5 and values of C upto tick #3, but still waiting for A #3 and later.
Once a row is filled and correspondent computation (i.e. task) is completed, the row can be discarded from the table. So rows are added at the bottom and discarded from the top.
Now in order to accomodate for such streams of values in our dataflow programming paradigm, all we need to do is to separate the notion of a dataflow variable name from the dataflow variable itself. That is, a variable name such as "A" is nothing more than a human readable string which points to an actual dataflow variable instance, which, in turn, can be bound to a value. (You can say the same about "regular" variables: they point to memory addresses) Once we done with the variable (i.e. it is assigned as an input to a task or multiple tasks, is bound and consumed etc.) the name can be used to point to the next variable in the dataflow stream and so on and so forth. Really, we only need names so we, humans, can read, compose and compile programs. (There is nothing new or unusual about it. Practically all modern programming languages use the same "dynamic mapping" between variable names and actual memory addresses. For example, variables of a recursively called function will point to different memory locations for each recursive call.)
Once our dataflow tasks are compiled and ready for execution (and executable code points to a memory address of a dataflow variable obtained through its name) we can advance name pointer to the next variable instance in the stream and use the same variable names to compile next task in the task stream. That is, nothing prevents us from considering our tasks themselves as dataflow variables of some sort as well (the sort with attached program to compute and bind its value). Thus tasks and their results will form the same kind of streams of values (i.e. they will add columns to our table) as any other dataflow variable.
We can depict this as following (first three rows of our table before any computations start):
Here oval shapes represent dataflow streams, empty circles represent unbound variables and circles with numbers represent bound variables. Dashed arrows represent mappings between names and variables and dotted arrows point to correspondent next stream elements.
The dataflow stream notion can be easily extended even further to processes (i.e. collection of collaborating tasks), process hierarchies etc.
Now let's assess the qualities of our system:
- First and foremost, we can now reason about overall state of our [distributed] system at any given tick number (or serial number or "version" of a stream value). Since each [stream element] value is still immutable it is guaranteed to be the same across all distributed components of our system. It does not matter at what actual time or by what means the value has been delivered to each component (for all we know it can be delivered by a horse rider written on a piece of paper three months after it was initially computed), still we can say that some variable X at some moment of T had some value A and that value was exactly the same for all components across our system which observed that variable. This is an extremely important property which is not provided by any other programming paradigm based on mutable state. This property alone allows for virtually infinite scalability of our system across any types of networks (including networks with horse-rider based communications). Recall that actual time interval associated with each tick is of no consequence whatsoever which means that we are automatically taken care of any possible delays, failure recovery, repeated transfers etc.
- Our system now can fully utilize all the computing power available to it at any given moment. The processing power allocation control becomes absolutely orthogonal to actual computations. Indeed, we can even prepare our computations in advance using unbound variables. Once any variable becomes bound, all dependent computations can be performed [concurrently] at once without regard to their serial numbers. For example, in above example, computations for (6,4,3) and (2,3,1) can be done concurrently since both value sets are known and correspondent variables are bound. Note that CPU allocation is fully automatic and does not require any special controls unless desired. The system simply automatically uses all available power to do as much work as possible at any given moment.
- We now have a great deal of flexibility in ways of organizing and controlling our computations. We can even compute future values of variables before past ones. Imagine that due to a network delay the value for tick#3 came before tick#2 (a situation which is very possible with UDP communications, for example). This will have no effect whatsoever on the correctness of our computations. The system will simply compute later result before earlier one. Many optimizations like loop unrolling becomes automatic as in case when loop iterations have no dependencies between each other, all iterations can be computed concurrently (number of CPUs permitting, of course)
Dataflow framework
In order to control all that power we need some programming constructs. Therefore raw dataflow streams are usually wrapped into dataflow channels or dataflow buffers.
Dataflow channels
Dataflow channel is simply a handle to an underlying dataflow stream. As stream elements are constantly added to the end and removed from the beginning of the stream, channel keeps and updates its reference to the first unbound element of its underlying stream. In other words, dataflow channel represents a volatile dataflow variable.
Dataflow buffers
Dataflow buffer is a channel which can control how many unbound stream elements may exist at any diven time. It encapsulates the abstraction of a bounded buffer (hence the name) which can have a size. Once an element becomes bound it is removed from the buffer and if remaining number of elements is less than requested size, new unbound elements are automatically added to the buffer. Thus, buffers usually are controlled by two parameters:
- Limit – this is a total number of stream elements which this buffer can hold. New elements will not be created until actual number of buffer elements will be less than this number
- Live – this is the number of elements which can be active (i.e. actively perform any computations) at any given time. This is apllicable to buffers which holds tasks and processes. Obviously, live counter cannot exceed limit counter.
Buffers provide a mechanizm to fine tune load distribution between different sub-components and allow to handle different usage patterns. For example, load often comes in bursts of activity followed by periods of relative calm. Having a buffer to keep enough pre-compiled instances to handle the burst and use pauses to prepare for the next burst, allows to spread the load more evenly in time and increase the overall capacity of the system. For example, if we have a buffer of http request processors with limit=10 and live=8 then we can have total of 8 http requests be processed concurrently at any given time (the rest will wait for the next available instance) and have 2 = 10 - 8 more instances standing as a "hot standby" ready to take the next request at any moment without any preparation delays. This allows for load tuning tradeoffs between total memory allocated by all http request processors and time requred to ready those processors to take requests. By increasing the limit we allocate more memory (i.e. create more instances in advance), but reduce system response time. By increasing live counter we can better handle burst activity, again for the price of greater memory allocation for http processing.
Implementation details
TQL Engine uses its own original implementation of dataflow which is absolutely free of any threading and usage of explicit thread locking and synchronization. Instead it is based on usage of modern high performance Compare-And-Set (CAS) primitives. Tasks and processes are dynamically assigned to actual threads only for the execution (when system has determined that all input variables are bound). No thread waiting on a lock can ever occur. This allows using a very limited number of threads (usually defined as a small multiplier of the number of CPUs available) to process virtually unlimited number of tasks (limited only by available memory). In turn, each task occupies only memory necessary to keep references to its input/output variables which is very small. On the other hand, using dataflow buffers allows to effectively control trade-offs between memory consumption and system response time and overall performance.
All this contributes to very high performance, reliability and fault tolerance of TQL system.