Just as our planet is split between water and land, so the world of computation seems split between batch and stream. Data can either be processed with high throughput in the land of batch or at low latency in the realm of stream. For the implementation of any data-driven application the system architect must opt for a solution offering the semantics of one or the other computation model. Unless, of course, you opt for a solution based on Naiad – or so the paper Naiad: A Timely Dataflow System claims. Published by Microsoft Research Silicon Valley in 2013, it won best paper award at SOSP 2013 and was quite an insightful read even six years later. Naiad is not the kind of paper whose implementation as-described became particularly popular, but rather it’s the kind of paper whose ideas influenced many other systems that are state of the art today. TensorFlow, still the behemoth of machine learning libraries today, is an example of a system influenced by Naiad (two authors of the Naiad paper are also authors of the original TensorFlow paper).

The paper describes a computation model called timely dataflow that can support batch, streaming and iterative workloads. The authors start from first principles and define the constraints that a system and the data processed by that system would have to follow to allow such heterogenous computation. Given the theoretical description of this model, they present a real, working implementation of a system called Naiad that realizes it. In the next few paragraphs I will try to condense the paper and explain it in my own words. I’ll expand on some parts where blog posts and other supplementary information helped clarify what was in the paper.

## Timely Dataflow

The computational model that underpins Naiad is called timely dataflow and is based on a directed, possibly cyclic graph. Vertices in this graph represent stateful operations on data and communicate with each other along directed edges. An external data producer provides timestamped input messages to the timely dataflow graph and an external source consumes output messages from it.

### Timestamps

Messages sent across the dataflow graph are logically timestamped. Logical here means that the timestamp does not represent an actual point in time (like a UNIX timestamp) but instead indicates a particular stage during data processing. For now, we can say that this logical timestamp is an epoch counter that the input source to the timely dataflow graph must provide. The input source must also tell the timely dataflow graph when no more messages for a particular epoch will be sent (when the epoch is complete). If we’re running a batch workload, this would be a counter for the specific batch of data we are processing. This timestamp is preserved in the final output of the graph. The entire purpose of the system managing the timely dataflow graph is to transport data through the graph and propagate completeness of an epoch.

Saying that a logical timestamp is only an epoch counter is not the full truth. Timely dataflow graphs are allowed to have cycles, which represent loops or feedback in the computation (useful for iterative workloads). If we observe a message just left a cycle, how do we know how often it looped through it? This information is required to further identify the progress a message has made through the graph and thus it is also embedded in the logical timestamp. A timestamp is thus really a tuple (e, <c_0, ..., c_k>) where e is the “user-provided” epoch counter and <c_0, ..., c_k> a list of loop counters. It is a list because cycles may be nested. The list is empty when the message enters the timely dataflow graph and upon entering a cycle, a new counter set to zero is added to the list. Each time a message loops through a cycle, this counter is incremented. If a message enters three cycles in a row and makes two rounds of progress in the last cycle, this would be represented by the list <0, 0, 2>. When a message leaves a loop, the counter for that particular loop is removed.

Notice that the timestamp alone does not yet uniquely identify the position of a message in every timely dataflow graph. If there are two (unnested) loops in a graph, a message will have the same timestamp whenever it is outside of a loop (just the epoch counter). To really pinpoint the progress of a message through the graph there are pointstamps. A pointstamp is the combination of a timestamp and a vertex. Every message has a pointstamp associated with it at any given moment in its journey through the timely dataflow graph.

Timestamps have a partial order imposed on them which states that for two timestamps t_1 = (e_1, c_1) and t_2 = (e_2, c_2), t_1 <= t_2 if both e_1 <= e_2 and c_1 <= c_2, where the loop counters are compared in order (lexicographically).

### Vertex API

To implement the timely dataflow specification, vertices in the system must implement a certain API. This API has two purposes. First, it is to allow messages to be sent through the graph. Second, it is to punctuate the flow of messages and introduce synchronization points. The entire API, with which both batch and streaming semantics can be achieved, is just four methods:

• SendBy(Edge e, Message m, Timestamp t): When called by a vertex, sends a message m across an edge e to another vertex, at timestamp t,
• OnRecv(Edge e, Message m, Timestamp t): A callback invoked on a vertex in response to a call to SendBy from another vertex connected to this vertex by edge e,
• NotifyAt(Timestamp t): Called by a vertex to request a notification (by the system) when all messages up to and before timestamp t have been received,
• OnNotify(Timestamp t): Invoked on a vertex (again a callback) when all messages with timestamp t' with t' <= t have been processed, and notification was previously requested via NotifyAt(t).

Note that a vertex does not “generate” or “decide” the timestamp when calling SendBy; it is provided by the input source to the timely dataflow graph and thereafter is simply propagated together with the message it is associated with. It also helps to understand that all of these calls are asynchronous. Events, whether they are messages or notifications, are propagated through a timely dataflow graph by some scheduler. The scheduler decides when to deliver messages and notifications to vertices and can do so immediately or after some delay. Finally, one constraint placed on calls to SendBy and NotifyAt is that they can only be called with a timestamp greater or equal to the timestamp t of the method in which they are invoked. For example, if NotifyAt(t') is called within OnRecv(e, m, t), then t' must be greater or equal to t. This ensures time never flows backward through the graph.

One interesting observation to make here is that SendBy and OnRecv are sufficient to implement a streaming system, while NotifyAt and OnNotify allow implementation of a batch system. A streaming system can OnRecv a message, apply some per-message transformation (like map()) and then SendBy it to the next operator vertex. By contrast, a batch system would use OnRecv to buffer the message and wait for the synchronization point introduced by OnNotify to operate on the full batch of data it has collected up to that point to perform a batch operation (like reduce()), which it could then stream onwards with SendBy.

The code below, taken from the paper and commented by me, shows how a vertex could be implemented using this vertex API. The vertex has two outputs, one for the occurrence count of each message per timestamp, and one for distinct messages:

class DistinctCount<S,T> : Vertex<T> {
Dictionary<T, Dictionary<S, int>> counts;
void OnRecv(Edge e, S msg, T time) {
// First time we see this timestamp
if (!counts.ContainsKey(time)) {
counts[time] = new Dictionary();
// We want to be notified when all messages up to and including time have been received
this.NotifyAt(time);
}
// First time we see this message
if (!counts[time].ContainsKey(msg)) {
// Initialize
counts[time][msg] = 0;
// Send a distinct message
this.SendBy(output1, msg, time);
}
counts[time][msg]++;
}
void OnNotify(T time) {
// Send out aggregated counts
foreach(var pair in counts[time])
this.SendBy(output2, pair, time);
// Clean up local state
counts.Remove(time);
}
}


### Progress Tracking

The final topic from the theoretical timely dataflow model we should explore before moving to the practical realization that is Naiad, is how progress gets tracked across the timely dataflow system. Progress tracking here refers to the way in which the timely dataflow implementation maintains knowledge about which timestamps are still being processed by the system and for which timestamps no more new messages will be received. The external data source has to tell the timely dataflow graph’s input vertex when no more messages for a certain epoch will be entered into the graph, so this discussion is essentially about how a timely dataflow implementation is expected to propagate this information through the graph. This is crucial to timely dataflow in order to enable delivery of notifications, which in turn powers the OnNotify API which is core to timely dataflow.

Progress tracking is achieved by a form of “distributed reference counting”. To understand which timestamps are still active in the system, workers in a timely dataflow system maintain two counters. In this discussion a worker is a computational unit that processes a subset of the vertices in the timely dataflow graph. The first counter is called the occurrence count and tracks how many messages the worker thinks are alive for a given pointstamp. The second counter represents the number of messages that precede the poinstamp in terms of logical time. This counter is called the precursor count. When the precursor count for a certain pointstamp is zero, that pointstamp is said to be in the frontier of the worker. When the occurrence count for a pointstamp in the frontier drops to zero, the scheduler is allowed to deliver a notification for the timestamp belonging to that pointstamp because it means no more messages with a smaller or equal timestamp are present in the system.

Each worker maintains its own occurrence and precursor counts. Simultaneously however, to deliver a notification a worker must guarantee that precursor and occurrence counts for a pointstamp are zero in the entire system. Even though we are discussing pointstamps, which include the location of a message, this is relevant because in data parallel implementations, many workers may process different replicas of the same vertex for messages from the same epoch, meaning pointstamps could be themselves distributed. To therefore maintain a (close-to) global view of the propagation of messages through the system without the need for a global coordinator, timely dataflow systems implement distributed progress tracking. In this scheme, instead of ever updating local counts based on their own observations (of e.g. a message being processed), workers broadcast pending updates to all other workers and only act on other workers’ updates. For example, if worker A retires a message, it does not decrement the occurrence count for that message itself. Instead, it broadcasts to all other workers, say B and C, that pointstamp p of that message is to receive an update of -1. Workers B and C perform this update to their local counters upon receiving this broadcast. This scheme ensures a near-convergence of counters across workers which is proven in another paper.

The discussion so far has centered around timely dataflow, which is a theoretical model that supports

• Structured loops to allow feedback in the dataflow,
• Stateful vertices that consume and produce records without global coordination,
• Notifications for vertices that have received all data for one round.

Besides this the paper also describes a real system called Naiad that implements timely dataflow. The GitHub page for Naiad is still live and can be viewed at github.com/MicrosoftResearch/Naiad. I find that the timely dataflow model is really the main contribution of the paper, but there are a few points from the description of Naiad that are worth highlighting. The first is how timely dataflow deals with fault tolerance, the second is the programming interface exposed by Naiad and the last are some performance evaluations we can glance over. In general, think of Naiad as an idealized, data parallel and distributed implementation of timely dataflow where the working set is meant to fit in the aggregate RAM of the available cluster.

### Fault Tolerance

Naiad has a simple interface to achieve fault tolerance. Each vertex in the dataflow graph can implement a Checkpoint and Restore API. The system scheduler will periodically (e.g. every time an epoch is completed) call Checkpoint on each vertex to allow the implementation to serialize its state. If a worker in the system were to fail, all remaining vertices would be reset to their latest valid checkpoint via Restore and the workload handled by the failed vertex would be re-launched and similarly restored.

Periodic checkpointing can have severe performance repercussions because the entire in-memory state must be serialized and written to disk or external storage. To alleviate this, the paper proposes an alternative fault tolerance mechanism that implementers of vertices are free to use. In this approach a continuous write-ahead log could be maintained by each vertex inside its OnRecv() method in which any operations are logged. Calls to Checkpoint would no longer be required to serialize the whole state, but maybe only some metadata. This alternative has two downsides, however. The first is that upon restoring, the entire write-ahead log would have to be replayed, which takes longer than restoring from serialized state. Secondly, writing to the write-ahead log can take a significant toll on throughput – the authors report up to 2x less throughput for some pipelines.

### Programming Interface

The Naiad implementation of timely dataflow exposes a quite low level programming interface compared to other popular data processing platforms. Many data analysis systems offer a query language based in some way or another on SQL. Instead, Naiad exposes only an API for assembling graphs using stages, an abstraction over a collection of vertices, and connectors, an abstraction over edges. This resembles the Spark interface. The Naiad engine then takes a description of a graph of stages and connectors and maps this to vertices and edges that it will run on workers during execution. The following snippet, taken from the paper, is an example of implementing the map-reduce paradigm in the low level Naiad API:

var input = controller.NewInput<string>();
// 1b. Define the timely dataflow graph.
var result = input.SelectMany(y => map(y)).GroupBy(y => key(y),(k, vs) => reduce(k, vs));
// 1c. Define output callbacks for each epoch
result.Subscribe(result => { ... });
// 2. Supply input data to the query.input.
OnNext(/*1st epoch data*/);
input.OnNext(/*2nd epoch data*/);
input.OnNext(/*3rd epoch data*/);
input.OnCompleted();


More examples can be found in the Examples/ folder in Naiad’s GitHub repository. The idea behind exposing such a low level API is that it allows many higher level APIs to be built on top of it. For example, various SQL dialects could be provided to end-users that the query engine could then translate to the same Naiad graph description. It also allows, in some sense, a separation of concerns. Instead of coupling the programming interface to the semantics and implementation of the system, this approach allows languages to be developed independently from the underlying system. Researchers focusing on query language design can focus just on this, while systems researchers can focus on improving the core engine.

### Performance Evaluation

While I find that the major contribution of this paper is the theoretical timely dataflow model, it is still important to understand the performance characteristics of a system implementing this model. If the model restricts the performance of even the most optimized implementation, then the model itself is the bottleneck. The paper reports four evaluations comparing the performance as well as expressiveness of Naiad with other systems for different tasks. The first task is batch iterative computation on graphs, the second is batch iterative machine learning, the third is streaming acyclic computation and the last combines streaming and iterative batch computation. In all evaluations Naiad is able to outperform comparable systems significantly. For example for the streaming task, median latency is reduced 250x compared to Kineograph. The reason given is reduced synchronization overheard in Naiad.

## Conclusion

The Naiad paper has two contributions to the distributed systems design community: a generalized model for high throughput iterative and low latency streaming computation called timely dataflow and an efficient practical implementation of this model called Naiad. While the Naiad system implementation has not become a popular framework itself (this was never the goal), it has influenced other systems such as TensorFlow that are popular today. Furthermore, one of the authors seems to be working on an open-source re-implementation in Rust which has some traction too. I think it’s a worthwhile read for anyone in the space of stream processing, batch processing, machine learning systems and/or data processing language design.