Wednesday, September 8, 2010

Cosmos Massive computations in Microsoft (comparison with Map Reduce, Hadoop and Skeleton Programming)

Map & Reduce is a distributed computation paradigm made popular by Google. Programs written in this functional style are automatically parallelized and executed on a large cluster of commodity machines. Many people believes that Map&Reduce has been inspired by the traditional fold and reduce operation typically adopted in functional programming. I believe that there are additional similarities with the less popular skeleton programming paradigm (the equivalent of design patterns for parallel computation). In fact, Map&Reduce can be considered as a particular type of farm, where the collector/reducers are guaranteed to receive the data in a sorted-by-key order. Both Map&Reduce and Skeleton Programming, delegate to the compiler the task of mapping the expressed computation on the top of the execution nodes. In addition, both of them hide the boring data marshaling/un-marshaling operations. Some people pointed out that one limit of Map&Reduce is the difficulty to express more complex type of computations, such as loops or pipes. In general, those operations are emulated by adopting some an external program that coordinates the execution of several map&reduce sequential stages. Within a given map&reduce stage the system can exploit parallelism, but different stages are executed in sequence.

The adoption of SQL-like interfaces is one interesting extension to Map&Reduce. SQL-like statements allow to express parallel computations with no need to write code in a specific programming language (Java, C++ or whatever). These extensions has been originally inspired by Google's Sawzall and they have been made popular by Yahoo's Hadoop Pig and by Facebook's Hadoop Hive. All those extensions are very useful syntactic sugar, but they do no extend the Map&Reduce model.

Scope (Structured Computations Optimized for Parallel Execution) is a new declarative and extensible scripting language targeted massive data analysis. Quoting the paper "Scope: Easy and Efficient Parallel Processing of Massive Data Sets": However, this model (Map&Reduce) has its own set of limitations. Users are forced to map their applications to the map-reduce model in order to achieve parallelism. For some applications this mapping is very unnatural. Users have to provide implementations for the map and reduce functions, even for simple operations like projection and selection. Such custom code is error-prone and hardly reusable. Moreover, for complex applications that require multiple stages of map-reduce, there are often many valid evaluation strategies and execution orders. Having users implement (potentially multiple) map and reduce functions is equivalent to asking users specify physical execution plans directly in database systems. The user plans may be suboptimal and lead to performance degradation by orders of magnitude"

In other words, you can leverage loops, pipes and many other types of parallel patterns with NO need of emulating those steps as in pure Map&Reduce. The process is transparent to the user since she can just express a collection of SQL-like statements (a "Script") and the compiler will generate the more appropriate parallel execution patterns. For instance, (Example 3, in the article)
R1 = SELECT A+C AS ac, B.Trim() AS B1
FROM R WHERE StringOccurs(C, “xyz”) > 2

#CS public static int StringOccurs(string str, string ptrn)
{ int cnt=0; int pos=-1;
while (pos+1 < str.Length) {
pos = str.IndexOf(ptrn, pos+1);
if (pos < 0) break;
return cnt;

where A, B, C are colums in a SQL-like schema, and particular StringOccurs C# string function is used to filter the column C. This example shows how to write a user-defined function in scope.

Scope runs on the top of Cosmos storage system. "The Cosmos Storage System is an append-only file system that reliably stores petabytes of data. The system is optimized for large sequential I/O. All writes are append-only and concurrent writers are serialized by the system. Data is distributed and replicated for fault tolerance and compressed to save storage and increase I/O throughput."

The language Scope supports:
  • Join: SQL-like;
  • Select: SQL-like;
  • Reduce: "it takes as input a rowset that has been grouped on the grouping columns specified in the ON clause, processes each group, and outputs zero, one or multiple rows per group";
  • Process: "it takes a rowset as input, processes each row in turn, and outputs a sequence of rows";
  • Combine: "it takes two input rowsets, combines them in some way, and outputs a sequence of rows";
One important difference with Map&Reduce based SQL-like interfaces is that each SCOPE script is compiled into a flexible execution directed acyclic graph (DAG) instead of a rigid map&reduce structure. The graph represents the execution plan. "A physical execution plan is, in essence, a specification of Cosmos job. The job describes a data flow DAG where each vertex is a program and each edge represents a data channel. A vertex program is a serial program composed from SCOPE runtime physical operators, which may in turn call user-defined functions. All operators within a vertex program are executed in a pipelined fashion, much like the query execution in a traditional database system. The job manager constructs the specified graph and schedules the execution. A vertex becomes runnable when its inputs are ready. The execution environment keeps track of the state of vertices and channels, schedules runnable vertices for execution, decides where to run a vertex, sets up the resources needed to run a vertex, and finally starts the vertex program. The translation into an execution plan is performed by traversing the parse tree bottom-up. For each operator, SCOPE has default implementation rules. For example, implementation of a simple filtering operation is a vertex program using SCOPE‟s built-in physical operator “filter” provided with a function that implements the filtering predicate. Following the translation, the SCOPE compiler combines adjacent vertices with physical operators that can be easily pipelined into (super) vertices."

Note that the users need to know neither the optimal mapping nor the optimal number of processes allocated for a particular SCOPE script execution. Everything is transparently computed by the SCOPE compiler and mapped on the top of a well defined set of "parallel design patterns". Dryad provides the basic primitives for SCOPE execution and Cosmos job allocation. Please refer this page if you are interested in more information about Dryad, or if you want to download the academic release of DryadLINQ.

This power of expression allows to express massive computations with very little SQL-like scripting (again from the above paper):
SELECT query, COUNT() AS count FROM "search.log"
USING LogExtractor GROUP BY query
HAVING count > 1000
OUTPUT TO "qcount.result";
Here petabytes (and more!) of searchlogs are stored on the distributed Cosmos storage and processed in parallel to mine the one with at least 1000 "counts". Another example, this time with joins:

SUPPLIER = EXTRACT s_suppkey,s_name, s_address, s_nationkey, s_phone, s_acctbal, s_commen FROM "supplier.tbl" USING SupplierExtractor;
PARTSUPP = EXTRACT ps_partkey, ps_suppkey, ps_supplycost FROM "partsupp.tbl" USING PartSuppExtractor;
PART = EXTRACT p_partkey, p_mfgr FROM “part.tbl" USING PartExtractor;
// Join region, nation, and, supplier
// (Retain only the key of supplier)
RNS_JOIN = SELECT s_suppkey, n_name FROM region, nation, supplier WHERE r_regionkey == n_regionkey AND n_nationkey == s_nationkey;
// Now join in part and partsupp
RNSPS_JOIN = SELECT p_partkey, ps_supplycost, ps_suppkey, p_mfgr, n_name FROM part, partsupp, rns_join WHERE p_partkey == ps_partkey AND s_suppkey == ps_suppkey;
// Finish subquery so we get the min costs
SUBQ = SELECT p_partkey AS subq_partkey, MIN(ps_supplycost) AS min_cost FROM rnsps_join GROUP BY p_partkey;
// Finish computation of main query
// (Join with subquery and join with supplier
// again to get the required output columns)
RESULT = SELECT s_acctbal, s_name, p_partkey, p_mfgr, s_address, s_phone, s_comment FROM rnsps_join AS lo, subq AS sq, supplier AS s WHERE lo.p_partkey == sq.subq_partkey AND lo.ps_supplycost == min_cost AND lo.ps_suppkey == s.s_suppkey ORDER BY acctbal DESC, n_name, s_name, partkey;
// output result OUTPUT RESULT TO "tpchQ2.tbl";

Cosmos and Scope are an important part of the Bing & Hotmail systems, as described in "Server Engineering Insights for Large-Scale Online Services, an in-depth analysis of three very large-scale production Microsoft services: Hotmail, Cosmos, and Bing that together capture a wide range of characteristics of online services

No comments:

Post a Comment