Optimizing Parallel Data Transformation

by Andrew Binstock

Visualization software relies substantially on the ability to process large amounts of data in parallel. Complex parallel processing, however, can be difficult to do efficiently using native threads. So, what are the practical alternatives?

Visualization applications often need to represent figuratively or graphically features of large volumes of data. Whether visualization is used in a business presentation, a scientific analysis, or a Monte Carlo simulation, it requires the efficient processing of the underlying data. This article reviews why the manual management of native threads using traditional techniques are insufficient. It then examines new models of concurrency that are gaining popularity and how they are applied to large volumes of data to make optimal use of parallel hardware resources.

The Traditional Approach

Data decomposition is the principal technique for breaking up a problem so that the same operation can be performed in parallel over multiple data items. A simple example, presented in other articles on this site, consists of performing calculations on a large array of values. For example, to process an array of 1 million data values, data decomposition would suggest create multiple threads, each of which processes its assigned fraction of the array’s elements to process. On a quad-core processor, this might rely on four threads, so that each core is kept busy doing calculations. Eventually, the whole array of values is processed by the four cores-in roughly one quarter the time it would have taken to do it using a single thread.

Problems, such as the one in this example, in which discrete chunks of data can be processed by threads without fear of collision with other threads, and in which there are not inter-thread dependencies are known as “embarrassingly parallel” problems. Such problems tend to have straightforward solutions that can be implemented well without too much difficulty. Over time, the approach illustrated previously has morphed into something somewhat more efficient.

As I discussed here, the basic approach is effective but not optimal. If the code were ported to a machine with more cores, the design would not take advantage of the additional cores. The better way that scales well is lightweight threading. Using this approach, data decomposition breaks a large task into small data tasks. These are then placed in the queue of a thread pool. The thread pool creates as many threads as there are processor cores and then assigns tasks to the threads as they become available. Lightweight threading works efficiently (it has some other benefits, which are discussed in the referenced article) and the use of special thread pool designs, such as those in the Intel® Threading Building Block’s library (Intel® TBB), can optimize performance even further.

However, the lightweight threading is mostly confined to handling data that is embarrassingly parallel. Many times, however, problems cannot be solved perfectly parallel ways. For example, at times, there can exist dependencies between threads such that changes in data being handled by one thread affect values in another thread. In addition, sometimes data chunks are not all processed alike. And finally, a third class of problems needs to obtain updates in real time as the problem is being solved. That is, it does not have the luxury of waiting for all data chunks to be processed before providing an answer. (In this category, consider the tabulation of election returns: Projections need to be made and voting trends visualized long before all the individual votes have been tallied.)

For these kinds of challenges, a series of new techniques is appearing. (Truth be told, some of these techniques are resurrected from theories put forth decades ago. The many-core era has made them suddenly relevant again and it has wrought various improvements to the standard implementations. So, in a sense, even the old ways are now new.) Among these new tools are: actors, dataflow, and map-reduce.


Actors are a concurrency abstraction that was first proposed in 1973, but only recently began to gain popularity. Actors can be thought of as thread-like entities with an inbox in which messages are deposited. They perform a specific task, generally in response to a message they receive from another actor. In most implementations, actors are limited to three possible types of operations:

  • receiving a message and performing a task in response,
  • sending a message to another actor,
  • creating other actors.

    A pure actor-based program starts with one actor that creates other actors to perform specific tasks. It, or some of the actors it creates, read data and determine what actions need to be performed. According to their decision, they send the data as messages to one or another actor, which performs the indicated action. Those actors might well send the results they generate to other actors for subsequent processing. This process goes on until the program completes.

    On most systems, an actor is mapped to a hardware thread at execution time. Within the actor, processing is single threaded. If a task could be parallelized, one branch of the parallel work is sent off to another actor to perform. Actor-based systems can have from a handful of actors to thousands of them active at any given moment. The underlying software maps the actors to the underlying threading resources as needed.

    Messaging between actors is asynchronous. An actor sends a message to another actor and then resumes processing whatever messages are in its “inbox.” The receiving actor might return requested data at a later point; if so, it would place that data as a message in the original actor’s inbox. Because state is not preserved by the original actor, the messages frequently include data that can assist the actor in re-establishing context. For example, a program might use an actor to look up a phone number for an individual.  The program receives a person’s name in its inbox and sends back a message with the phone number. To provide context, it will typically send back both the person’s name and the phone number, so that the requesting actor can decide what to do next. The required duplication of data shortly is a topic I’ll touch on shortly.

    The actor model is highly effective for parallel programming because it is highly configurable and highly scalable. The language that initiated the current interest in actors is Erlang*. It was designed by the Swedish phone company, Ericsson*, to operate its phone routing switches. So, actors can definitely scale.

    Actors also get rid of the problems of locks and mutual exclusion that torment traditional native threading. They do this by sending immutable data to threads. In other words, when data is sent to an actor, no other actor can touch or change it in flight. Only the receiving actor can modify it. This aspect requires that developers think slightly differently about program design, but it becomes second nature after a while to keep data changes within actors and to send only immutable data in messages.

    This approach is not very different from the kind of programming used in service-oriented architecture (SOA), in which messages are sent to various Web services. If you were to view a program as a collection of asynchronous Web services, you would essentially have an actor-based program in the large.

    An additional advantage of this model is that because actors are the fundamental computational unit, development and testing are frequently greatly facilitated. That is, developers can validate an actor’s operation in isolation by testing all possible messages it can be sent. Code now comes close to being provably correct.

    The one principal downside is the one mentioned earlier: data frequently must be duplicated. I mentioned this in connection with preserving state across asynchronous operations. A second downside is that data is not shared. Typically, it is not placed somewhere to be operated on by various actors; rather, actors process the data and then send it to another actor for further processing. So the data, in various states of transformation, can move between actors several times before the program completes. Despite this, actor-based parallel programming is highly efficient, fast, and enables highly customized data operations to take place without recourse to locking mechanisms.

    Because of this, actors are appearing in many new languages. For example, on the JVM, Scala* and Fan* both have actors; Groovy* will do so by end of year. For pure Java* enthusiasts, there are two actor frameworks http://en.wikipedia.org/wiki/Kilim and http://osl.cs.uiuc.edu/af/. In the .NET* world, Microsoft*’s Parallel Computing Platform team has been investigating actors. An article on their work as it might appear in Visual Studio* 2010 can be found here.

    Users of Intel® TBB are also trying to create an actor framework based on the library, but as yet those efforts are embryonic.

    While actors are an excellent general-purpose option for concurrent programming, there are other options for certain specific kinds of data management tasks.

    Map Reduce

    Many data visualization problems take the form of handling substantial amount of data and reducing it to smaller data sets that summarize the data contents. Two common examples might be reading in text and producing a word frequency histogram, or reading voter’s ballots and summarizing in a table the counts for each candidate.

    These problems are well suited to a technology known as MapReduce*, which was implemented by Google* (although previous similar technologies had been designed by Teradata*, a company that specializes in large, share-nothing parallel data management).

    In the MapReduce model, data is read in large blocks and sent to individual processes. These processes then map the data to an intermediate summary form. This intermediate form is then sent to other processes where it is reduced into the final data form.

    Using the example of the counting word frequency: Large blocks of text are read in and sent to individual processes, which create word lists. These word lists are then sent to other processes where they are sorted and tallied. These results are then summed by a final process that creates the desired histogram of word frequencies in the text. Because the word lists are generated and tabulated throughout the process, it’s possible to see the results as they are generated.

    MapReduce has several aspects in common with actors: it is a shared-nothing model, meaning that data in transit is immutable. Secondly, communication between processes is handled via message passing. And, finally, it should be noted that like actors, MapReduce scales very, very well. In this paper, Google engineers discuss running MapReduce on large clusters and grids. In one example the engineers present, 2000 machines were used to perform 200,000 map operations across 5,000 tasks. Clearly, such scalability would be hard to duplicate using traditional approaches.

    The Google implementation of MapReduce includes process-management functions. For example, the supervisor code that farms out the input data to various processes also monitors the progress of those processes. If, after a predetermined period, there has been no output or response to pings, the process is deemed dead and its work is reassigned to another process.

    Google also adds a way of potentially speeding up the final map operations. Once it’s clear that the program is reaching its conclusion and there are only a few map operations left, the supervisor program will send the same data to several nodes. As soon as one of them completes its work, the other duplicate processes are killed. This means that if one node happens to be slower than the others, its work will be picked up by one of the other processes and run more quickly.

    There are several implementations of MapReduce available today. Google MapReduce uses C++ and has Python* and Java* mappings. Apache Hadoop* is a free open-source implementation in Java that was developed by Yahoo!* and that is widely used today. There are numerous other implementations in many different languages.

    Actors and MapReduce are especially suitable to processing data items with a logical-oriented mind set-the one typically favored in standard programming. However, certain problems are better solved by fixing the logic and managing the data movement between logical functions. This approach is discussed next.


    Dataflow is a technique of manipulating data in parallel in which the design and emphasis is not placed on the logic, but on the movement of data between processes. At first glance, this does not seem like a large change, but in fact it reverses the way programs are designed and implemented.
    In a typical dataflow program, the designer creates building blocks that represent data processes and then flows data between the processes. The principal concern is how to sequence the process building blocks to get the optimal data flow that provides the right answer.

    Most developers can see an echo of this in UNIX utilities. Each utility, whether it be grep, sed, awk, uniq, etc. manipulates incoming data in some way. That data is then piped to the next building block until ultimately the data has been processed into its final form. The UNIX shell programmer faces a dataflow challenge: how to create a data flow that is built of standard building blocks but delivers the needed data in the fewest possible steps.

    One important distinction is that in dataflow, data is generally not streamed, but passed from process to process in chunks. For a process to accept a new chunk, two things must be true: the chunk’s previous processing must all be complete and the process must have completed processing all previous chunks. These chunks can have associated tags, which alter how the process acts on the data. (In UNIX, think of specifying command-line parameters.) This aspect is key because it underscores a principal benefit of dataflow programming: the building blocks are not modified. Only the data or the tags flowing into them are. In addition to chunks and tags, dataflow programming can use special data items to signal events in a dataflow. For example, a dataflow might be modified by a data packet alerting it to the time being 5 pm, after which all charges to the customer’s bill are processed using a different building block.

    Scalability is obtained is by running multiple building blocks in parallel. Because the building blocks are black boxes-completely immutable processes-they can be run anywhere on the system. The data flow programming and the data tags control their operation.

    Dataflow programming is a highly effective way of processing large volumes of data in complex ways. However, the programming as such is substantially different from imperative or OO-style programming and it requires different tools. This wiki contains discussion of many tools and resources for dataflow programming.

    Wrapping Up

    In this article, I’ve examined some important alternatives to the manual use of native threads and locks to provide parallelism. Actors are a flexible mechanism that can be highly tailored to fit specific program needs and provide concurrency. They represent a direction that many pundits think will be adopted by many languages and platforms as the means of delivering simple concurrency in the future. This prediction seems to be coming true based on the number of languages already adding support for actor-based programming.

    Dataflow is also a generic parallel programming technique. It’s very fast, uses completely modular, reusable processes and has been tested on very large data volumes. However, it requires a different kind of thinking that requires stepping outside the ambit of conventional programming and customary tools.

    Finally, MapReduce addresses a specific parallel problem, albeit one that occurs frequently: the condensation of data into reduced representation. This process is used in analysis of data and data roll-ups in data visualization contexts such as interactive reports, real-time dashboards, and real-time data analysis.

    Data visualization can span all these techniques, so the appropriate ones should be chosen for the specific need. Whichever you choose, you’re likely to find it faster and more trouble-free than traditional native thread programming.

    Author Bio

    Andrew Binstock writes technical white papers at Pacific Data Works LLC. He is also a senior contributing editor for InfoWorld and a columnist for SD Times. He is the author or co-author of several books on programming, including two available from Intel Press. During his free time, he contributes to the open-source typesetting and page-layout project, Platypus. He can be reached  through his blog.

    For more complete information about compiler optimizations, see our Optimization Notice.