The adoption of SQL-like interfaces is one interesting extension to Map&Reduce. SQL-like statements allow to express parallel computations with no need to write code in a specific programming language (Java, C++ or whatever). These extensions has been originally inspired by Google's Sawzall and they have been made popular by Yahoo's Hadoop Pig and by Facebook's Hadoop Hive. All those extensions are very useful syntactic sugar, but they do no extend the Map&Reduce model.
Scope (Structured Computations Optimized for Parallel Execution) is a new declarative and extensible scripting language targeted massive data analysis. Quoting the paper "Scope: Easy and Efficient Parallel Processing of Massive Data Sets": However, this model (Map&Reduce) has its own set of limitations. Users are forced to map their applications to the map-reduce model in order to achieve parallelism. For some applications this mapping is very unnatural. Users have to provide implementations for the map and reduce functions, even for simple operations like projection and selection. Such custom code is error-prone and hardly reusable. Moreover, for complex applications that require multiple stages of map-reduce, there are often many valid evaluation strategies and execution orders. Having users implement (potentially multiple) map and reduce functions is equivalent to asking users specify physical execution plans directly in database systems. The user plans may be suboptimal and lead to performance degradation by orders of magnitude"
In other words, you can leverage loops, pipes and many other types of parallel patterns with NO need of emulating those steps as in pure Map&Reduce. The process is transparent to the user since she can just express a collection of SQL-like statements (a "Script") and the compiler will generate the more appropriate parallel execution patterns. For instance, (Example 3, in the article)
R1 = SELECT A+C AS ac, B.Trim() AS B1where A, B, C are colums in a SQL-like schema, and particular StringOccurs C# string function is used to filter the column C. This example shows how to write a user-defined function in scope.
FROM R WHERE StringOccurs(C, “xyz”) > 2
#CS public static int StringOccurs(string str, string ptrn)
{ int cnt=0; int pos=-1;
while (pos+1 < str.Length) {
pos = str.IndexOf(ptrn, pos+1);
if (pos < 0) break;
cnt++;
}
return cnt;
}
#ENDCS
Scope runs on the top of Cosmos storage system. "The Cosmos Storage System is an append-only file system that reliably stores petabytes of data. The system is optimized for large sequential I/O. All writes are append-only and concurrent writers are serialized by the system. Data is distributed and replicated for fault tolerance and compressed to save storage and increase I/O throughput."
The language Scope supports:
- Join: SQL-like;
- Select: SQL-like;
- Reduce: "it takes as input a rowset that has been grouped on the grouping columns specified in the ON clause, processes each group, and outputs zero, one or multiple rows per group";
- Process: "it takes a rowset as input, processes each row in turn, and outputs a sequence of rows";
- Combine: "it takes two input rowsets, combines them in some way, and outputs a sequence of rows";
Note that the users need to know neither the optimal mapping nor the optimal number of processes allocated for a particular SCOPE script execution. Everything is transparently computed by the SCOPE compiler and mapped on the top of a well defined set of "parallel design patterns". Dryad provides the basic primitives for SCOPE execution and Cosmos job allocation. Please refer this page if you are interested in more information about Dryad, or if you want to download the academic release of DryadLINQ.
This power of expression allows to express massive computations with very little SQL-like scripting (again from the above paper):
SELECT query, COUNT() AS count FROM "search.log"Here petabytes (and more!) of searchlogs are stored on the distributed Cosmos storage and processed in parallel to mine the one with at least 1000 "counts". Another example, this time with joins:
USING LogExtractor GROUP BY query
HAVING count > 1000
ORDER BY count DESC;
OUTPUT TO "qcount.result";
SUPPLIER = EXTRACT s_suppkey,s_name, s_address, s_nationkey, s_phone, s_acctbal, s_commen FROM "supplier.tbl" USING SupplierExtractor;
PARTSUPP = EXTRACT ps_partkey, ps_suppkey, ps_supplycost FROM "partsupp.tbl" USING PartSuppExtractor;
PART = EXTRACT p_partkey, p_mfgr FROM “part.tbl" USING PartExtractor;
// Join region, nation, and, supplier
// (Retain only the key of supplier)
RNS_JOIN = SELECT s_suppkey, n_name FROM region, nation, supplier WHERE r_regionkey == n_regionkey AND n_nationkey == s_nationkey;
// Now join in part and partsupp
RNSPS_JOIN = SELECT p_partkey, ps_supplycost, ps_suppkey, p_mfgr, n_name FROM part, partsupp, rns_join WHERE p_partkey == ps_partkey AND s_suppkey == ps_suppkey;
// Finish subquery so we get the min costs
SUBQ = SELECT p_partkey AS subq_partkey, MIN(ps_supplycost) AS min_cost FROM rnsps_join GROUP BY p_partkey;
// Finish computation of main query
// (Join with subquery and join with supplier
// again to get the required output columns)
RESULT = SELECT s_acctbal, s_name, p_partkey, p_mfgr, s_address, s_phone, s_comment FROM rnsps_join AS lo, subq AS sq, supplier AS s WHERE lo.p_partkey == sq.subq_partkey AND lo.ps_supplycost == min_cost AND lo.ps_suppkey == s.s_suppkey ORDER BY acctbal DESC, n_name, s_name, partkey;
// output result OUTPUT RESULT TO "tpchQ2.tbl";
Cosmos and Scope are an important part of the Bing & Hotmail systems, as described in "Server Engineering Insights for Large-Scale Online Services, an in-depth analysis of three very large-scale production Microsoft services: Hotmail, Cosmos, and Bing that together capture a wide range of characteristics of online services
No comments:
Post a Comment