One of the projects we’re currently running in my group (Amdocs’ Technology Research) is an evaluation the current state of different option for reporting on top of and near Hadoop (I hope I’ll be able to publish the results when we’d have them). Anyway, part of the preparations for the benchmark includes ingesting a lot of events (CDRs) into the system and creating different aggregations on top of them for instance, for voice call billing events we create yearly, monthly, weekly and daily and hourly aggregations on the subscriber level which include measures like : count of calls, average duration, sum of pricing, median balance, hourly distribution of calls, popular destinations etc.
We are using spark to do the ingestion and I thought that there are two interesting aspects I can share, which I haven’t seen too many examples on the internet, namely:
- doing multiple aggregations i.e. an aggregations that goes beyond a work-count level
- Writing to an Hadoop output format (Parquet in the example)
I created a minimal example, which uses a simple, synthesized input and demonstrates these two issues – you can get the complete code for that on github . The rest of this post will highlight some of the points from the example.
Let’s start with the main core spark code, which is simple enough:
- line 1 – is reading a CSV as text file
- line 3 is doing a simple parsing of the file and replacing it with a class. The parsed RDDs are cached since we’d iterate them multiple times (for each aggregation)
- like 5,6 groups by multiple keys. The nice way to do that is using SparkSQL but the version I used (1.0.2) had very limited and undocumented SQL (I had to go to the code). This should be better in future versions. If you still use “regular” spark, I haven’t found a better way to group on multiple fields except creating the complex key by hand which is what getHourly and getWeekly methods do (create a string with year,month… etc dimensions)
- like 11,12 do the aggregations themselves (which we’d look into next). The output is again Pair RDDs. This seems useless however, it turns out you need pair RDDs if you want to save to Hadoop formats (more on that later)
Once we do the group by key (lines 5,6 above) we have a collection of Call records (Iterable[Call]). Now scala has a lot of nifty functions to transform Iterables (map, flatmap, foldleft etc. etc.) however we have a long list of aggregations to compute and the the Calls collection can get rather big (e.g. for yearly aggregates) and we have lots and lots of collections to iterate (terabytes and terabytes). So instead of using all these features I opted for the more traditional Java like aggregation, which while being pretty ugly, minimized the number of iterations I am making on the data.
You can see the result below (if you have a nicer looking, yet efficient, idea, I’d be happy to hear about it)
Save to Parquet
The end result of doing the aggregations is an hierarchical structure – lise of simple measures (avgs, sums, counts etc.) but also a phone book, which also has an array of pricings and an hours breakdown which is also an array. I decided to store that in Parquet/ORC formats which are efficient for queries in Hadoop (by Hive/Impala depending on the Hadoop distribution you are using). For the purpose of the example I included the code to persist to parquet.
If you can use SparkSQL than support for Parquet is built in and you can do something as simple as
You might have notices that the weeklyAggregated is a simple RDD and not a pair RDD since it isn’t needed here. Unfortunately for me I had to use Spark 0.9 (our lab is currently on Cloudera 5.0.1) – also in the more general case of writing to other Hadoop file formats you can’t use this trick.
One point specific to Parquet is that you can’t write to it directly – you have to use a “writer” class and parquet has Avro, Thrift and ProtoBuf writers available. I thought that’s going to be nice and easy so I looked for scala library to serialize to one of these formats and chose Scalavro (which I used in the past) turns out that, while the serialization is Avro compatible it is not the standard Avro class the Avro writer for parquet expects. No problem, I thought I’ll just take another library that creates works with Thrift – Twitter has one, it is called Scrooge works nice and all – but again it is not completely standard (doesn’t inherit from TBase). Oh well, maybe protobuf will do the job so I tried ScalaBuff , alas, this didn’t work well either (inheriting from LightMessage and not Message as expected by the writer) – I ended up using the plain Java generator for protobuf. This further uglified the aggregation logic but at least it got he job done. The output from the aggregation is a pair RDD of protobuf serializable aggregations
So, why the the pair RDD? it turns out all the interesting save functions that can use Hadoop file format only exist on pair RDD and not on regular ones. If you know this little trick the code is not very complicated:
The first two lines in the snippet above configure the writer and are specific to parquet. the last line is the one that does the actual save to file – it specified the output directory, the key class (Void since we don’t need this with the parquet format), the for the records, the Hadoop output format class (Parquet in our case) and lastly a job configuration
To summarize. When you want to do more that a simple toy example the code may end up more complicated than the concise examples you see on-line. When working with lots and lots of data the number of iterations you make on the data also begin to be important and you need to pay attention to that.
Spark is a very powerful library for working on big data, it has a lot of components and capabilities. However its biggest weakness (in my opinion anyway) is its documentation. It makes it easy to start work with the platform, but when you want to do something a little more interesting you are left to dig around without proper directions. I hope this post will help some others when they’d do their digging around :)
Lastly,again I remind you, that you can get the complete code for that on github
I presented big data to Amdocs’ product group last week. One of the sessions I did was recorded so I might be able to add here later. Meanwhile you can check out the slides.
Note that trying to keep the slide visual I put some of the information is in the slide notes and not on the slides themselves.
Google’s Jeffrey Dean and Sanjay Ghemawat filed the patent request and published the map/reduce paper 10 year ago (2004). According to WikiPedia Doug Cutting and Mike Cafarella created Hadoop, with its own implementation of Map/Reduce, one year later at Yahoo – both these implementations were done for the same purpose – batch indexing of the web.
Back than, the web began its “web 2.0″ transition, pages became more dynamic , people began to create more content – so an efficient way to reprocess and build the web index was needed and map/reduce was it. Web Indexing was a great fit for map/reduce since the initial processing of each source (web page) is completely independent from any other – i.e. a very convenient map phase and you need to combine the results to build the reverse index. That said, even the core google algorithm – the famous pagerank is iterative (so less appropriate for map/reduce), not to mention that as the internet got bigger and the updates became more and more frequent map/reduce wasn’t enough. Again Google (who seem to be consistently few years ahead of the industry) began coming up with alternatives like Google Percolator or Google Dremel (both papers were published in 2010, Percolator was introduced at that year, and dremel has been used in Google since 2006).
So now, it is 2014, and it is time for the rest of us to catch up with Google and get over Map/Reduce and for multiple reasons:
- end-users’ expectations (who hear “big data” but interpret that as “fast data”)
- iterative problems like graph algorithms which are inefficient as you need to load and reload the data each iteration
- continuous ingestion of data (increments coming on as small batches or streams of events) – where joining to existing data can be expensive
- real-time problems – both queries and processing
In my opinion, Map/Reduce is an idea whose time has come and gone – it won’t die in a day or a year, there is still a lot of working systems that use it and the alternatives are still maturing. I do think, however, that if you need to write or implement something new that would build on map/reduce – you should use other option or at the very least carefully consider them.
So how is this change going to happen ? Luckily, Hadoop has recently adopted YARN (you can see my presentation on it here), which opens up the possibilities to go beyond map/reduce without changing everything … even though in effect, a lot will change. Note that some of the new options do have migration paths and also we still retain the access to all that “big data” we have in Hadoopm as well as the extended reuse of some of the ecosystem.
The first type of effort to replace map/reduce is to actually subsume it by offering more flexible batch. After all saying Map/reduce is not relevant, deosn’t mean that batch processing is not relevant. It does mean that there’s a need to more complex processes. There are two main candidates here Tez and Spark where Tez offers a nice migration path as it is replacing map/reduce as the execution engine for both Pig and Hive and Spark has a compelling offer by combining Batch and Stream processing (more on this later) in a single engine.
The second type of effort or processing capability that will help kill map/reduce is MPP databases on Hadoop. Like the “flexible batch” approach mentioned above, this is replacing a functionality that map/reduce was used for – unleashing the data already processed and stored in Hadoop. The idea here is twofold
- To provide fast query capabilities* – by using specialized columnar data format and database engines deployed as daemons on the cluster
- To provide rich query capabilities – by supporting more and more of the SQL standard and enriching it with analytics capabilities (e.g. via MADlib)
Efforts in this arena include Impala from Cloudera, Hawq from Pivotal (which is essentially greenplum over HDFS), startups like Hadapt or even Actian trying to leverage their ParAccel acquisition with the recently announced Actian Vector . Hive is somewhere in the middle relying on Tez on one hand and using vectorization and columnar format (Orc) on the other
The Third type of processing that will help dethrone Map/Reduce is Stream processing. Unlike the two previous types of effort this is covering a ground the map/reduce can’t cover, even inefficiently. Stream processing is about handling continuous flow of new data (e.g. events) and processing them (enriching, aggregating, etc.) them in seconds or less. The two major contenders in the Hadoop arena seem to be Spark Streaming and Storm though, of course, there are several other commercial and open source platforms that handle this type of processing as well.
In summary – Map/Reduce is great. It has served us (as an industry) for a decade but it is now time to move on and bring the richer processing capabilities we have elsewhere to solve our big data problems as well.
Last note – I focused on Hadoop in this post even thought there are several other platforms and tools around. I think that regardless if Hadoop is the best platform it is the one becoming the de-facto standard for big data (remember betamax vs VHS?)
One really, really last note – if you read up to here, and you are a developer living in Israel, and you happen to be looking for a job – I am looking for another developer to join my Technology Research team @ Amdocs. If you’re interested drop me a note: arnon.rotemgaloz at amdocs dot com or via my twitter/linkedin profiles
*esp. in regard to analytical queries – operational SQL on hadoop with efforts like Phoenix ,IBM’s BigSQL or Splice Machine are also happening but that’s another story
illustration idea found in James Mickens’s talk in Monitorama 2014 – (which is, by the way, a really funny presentation – go watch it) -ohh yeah… and pulp fiction :)
With Amdocs TeraScale, my previous project, move into production, I moved from to a new role within Amdocs and took over the Technology Research group, which is part of the big data and strategic initiatives business unit.
Now it is time to expand the group and I am looking for a developer-architect and/or senior developer to join my group.
- If you are a technologist at heart and like learning new stuff every day
- If you can pick up a new a technology and be up and running with it in a day or two
- If you want to tinker with the latest and greatest technologies (big data, in memory grids, cloud management systems, NFV, columnar database etc.)
- If you want to help shape the technology roadmap of a large corporation
I am looking for you
The positions are located in Raanaa Israel. If you’re interested you can contact me via the contact form,my twitter, linkedin and/or my work mail at arnon.rotemgaloz at amdocs.com
Related articles across the web
Apparently there’s this new distributed architecture thing called microservices out and about – so last week I went ahead and read Martin Fowler’s & James Lewis’s extensive article on the subject . and my reaction to this was basically:
I guess it is easier to use a new name (Microservices) rather than say that this is what SOA actually meant – re http://t.co/gvhxDfDWLG
— Arnon Rotem-Gal-Oz (@arnonrgo) March 16, 2014
Similar arguments (nothing new here) were also expressed after Martin’s tweet of his article e.g. Clemens Vasters’ comment:
Or Steve Jones’ post “Microservices is SOA, for those who know what SOA is.”
Autonomy, smart endpoints, events etc. that the article talks about are all SOA concepts – If you happen to have read my book and you read this article you’ve probably identified these as patterns like inversion of communication, Service Host,Service Instance, Service Watchdog and others.
So microservices as they appear from Martin’s & James’s article is pretty much service orientation without some of the bad misconception that tied into the SOA moniker like WS*, ESBs as a must, etc. -perhaps that’s a good enough reason for a new name but personally I doubt it.
However, the story doesn’t end here there are various other opinions as to what microservices are such as Chris Ford’s view who says that
“My own opinion is that microservice architectures can be understood through a single abstract architectural constraint which can be interpreted along many different degrees of freedom.
X can be varied independently of the rest of the system.”
The idea that something is separate and can be varied independently from the rest of the system is good but I would hardly say it is a definition of anything or at least anything new. CSCI (Computer Software Configuration Item) which I first heard of as part of DOD-STD-2167A (published in 1988) essentially means the same thing a CSCI is a component that can be varied independently from the rest of the system. In 2167A eyes it also means a very detailed , waterfall, documentation laden process, which isn’t what anyone thinks service or microservices should entail but it does demonstrate that “being varied independently” doesn’t mean much.
I am sure some reader goes something like “but wait, we’re talking about micro-services here – so they should also be small” – indeed there are posts like James Hughes’s on microservice with a quote like
“First things first what actually is a micro service? Well there really isn’t a hard and fast definition but from conversations with various people there seems to be a consensus that a micro service is a simple application that sits around the 10-100 LOC mark.”
(truth be told is that in the next sentence James says that LOC is an atrocious way to compare implementations but I thought it was worth repeating due to the use of the words “there seems to be a consensus”)
So how can you have 100 LOC services? you can get there if you rely on frameworks (like Finagle or Sinatra James mention) generate serialization/deserialization code (protobuff, thrift, avro etc.) – this is essentially building on a smart service host. Another example for this would be developing in Erlang with its supervisor hierarchies which also brings us to another way to reduce LOC by use languages that are less verbose (like the aforementioned Erlang, python or scala vs. say, Java).
I would say., however, that if you find you have a 10 lines of code service, you are more likely than not, implementing a function as a service and you don’t have a real service micro or not- e.g. I can’t see you having a decentralized storage (and autonomy) as mentioned in Martin’s and Lewis’s article above or having monitoring and instrumentation that Hughes mention.
You should also keep in mind that while better and cheaper networks allow us to push the limits – the fallacies of distributed computing still exist furthermore having a lot of small services that you need to manage along with the performance hits for serializations and deserializations, security etc. may very well mean that you moved from valid smaller “micro” services into the realm of headache which I called “Nano services”
“Nanoservice is an antipattern where a service is too fine-grained. A nanoservice is a service whose overhead (communications, maintenance, and so on) outweighs its utility.”
So there we have it, for the most part microservices is just another name for the principle of SOA, anther name might have been appropriate in the hype days of SOA, but I think these days most of that vapor has cleared and people understand better what’s needed. Furthermore if we do want to name proper SOA by a new name I think microservices is a poor term as it leads toward the slippery slope into nano-services and 10 lines of code which are just your old web-service method executed by a fancy chic host using a hot serialization format.
Micro or not, services should be more useful than the overhear they incur
illustration by Les Chatfield
I’ve been working with Hadoop for a few years now and the platform and ecosystems has been advancing at an amazing pace with new features and additional capabilities appearing almost on a daily basis. Some changes are small like better scheduling in Oozie; some are still progressing like support for NFS some are cool like full support for CPython in Pig but, in my opinion, the most important change is the introduction of YARN in Hadoop 2.0.
Hadoop was created with HDFS, a distributed file system, and Map/Reduce framework – a distributed processing platform. With YARN hadoop moves from being a distributed processing framework into a distributed operating system.
“operating system”, that sounded a little exaggerated when I wrote it, so just for fun, I picked up a copy of Tanenbaum’s “Modern Operating Systems”*, I have lying around from my days as a student – Tanenbaum says there are two views for what an OS is:
- A virtual machine – “…the function of the operating system is to present the user with the equivalent of an extended machine or virtual machine that is easier to program that the underlying hardware”
- A resource manager “… the job of the operating system is to provide for an orderly and controlled allocation of the processors, memories, and I/Odevices among the various programs competing for them”
Hadoop already had the first part nailed down in its 1.0 release (actually almost from its inception). With YARN it gets the second – so, again, in my opinion Hadoop now can be considered a distributed operating system.
So, YARN is hadoop resource manager, but what does that mean. Well, previous versions of Hadoop were built around map/reduce (there were few attempts at providing more computation paradigms but m/r was the main and almost only choice). The map/reduce framework, in the form of the jobtracker and tasktracker handled both the division of work as well as managing the resources of the servers – in the form of map and reduce slots that each node was configured to have.
With Hadoop 2.0, the realisation that map/reduce, while great for some use cases, is not the only game in town, led to a better, more flexible, design that separates the responsibility of handling the computational resources from running anything, like map/reduce, on these resources. YARN, as mentioned above, is that new resource manager.
There’s a lot more to say about YARN, of course, and I highly recommend reading HortonWorks’ Arun Murthy’s excellent series of posts introducing it.
What I do want to emphasis is the effect that this separation already has on Hadoop eco-system, here are a few samples
- Storm on Yarn - Twitter’s streaming framework made to run on Hadoop (Yahoo)
- Apache Samza – a Storm alternative developed from the ground up on YARN (Apache)
- HOYA – HBase on Yarn, enabling on the fly clusters (Hortonworks)
- Weave – a wrapper around YARN to simplify deploying applications on it (Continuuity)
- Giraph – a graph processing system (Apache)
- LLama – a framework to allow external servers to get resources form Yarn – (Cloudera)
- Spark on Yarn – Spark is an in-memory cluster for analytics
- Tez – a generalization of map/reduce to any directly acyclic graph of tasks (HortonWorks)
In summary – in my opinion, the introduction of YARN into the Hadoop stack is a game changer – and it isn’t some theoretic thing that would happen in the distance future – Hadoop 2.0 is now GA , so it is all right here, right now…
illustration by Elizabeth Moreau Nicolai on Born Librarian
*- ok, so I have an earlier edition than that :)
Every now and then I get some question by email, I usually just answer them directly but considering I got 2 such questions this week and that I have’t blogged for awhile (I do have a post about YARN which I hope to finish soon) – I thought I’d also publish my replies here.
Question #1 from Simon:
In your very interesting article “Bridging the Impedance Mismatch Between Business Intelligence and Service-Oriented Architecture” you highlight the challenges for BI and SOA to co-exist – that was 6 or so years ago – have you seen any advances that would cause you to revise that view?
Question #2 from Marc:
Great book! I have some trouble with the ch 8.4. If you want say a service directly Map whith en table representation on the DB is en wrong thing I’am agree whith you. BUT, if a service is name ‘person’ and the result of get or put (a1b2c3) is a document of all the information’s person, it’s SOA or old way? Personally I implement this on a large insurance company. This type of service in an entity service (for other author is a thin service) but I think for you is a coarse service. Coarse of course, because they are a lot of information. But it’s the more use and reuse service (it’s an attribute of thin service on another books). In fact we test deeply this architecture, and (because we make just 1 call) it’s more relevant than the RPC way (not for 1 call but it’s better after 2 call, ex. Person + adresses.
I think I am not use old way, what’s your opinion?
PS sorry for my poor English, I’m French !
Thanks for the compliment on the book :)
It doesn’t sound like what you’re doing is the “same old way” anti-pattern. The point of that anti-pattern is to say that that if you just put an SOA name on something which is essentially an n-tier architecture than it isn’t really SOA. Same goes for artificially inserting a “service layer” without taking the steps to separate and isolate services anywhere besides that layer.
In any event, when designing architectures, getting to a SOA or a RESTful or whatever is not a goal in itself. We can, and should, use design ideas from whatever paradigm and get something that is both a good fit for our problem and a sustainable solution for moving forward.
The NoSQL moniker that was coined circa 2009 marked a move from the “traditional” relational model. There were quite a few non-relational databases around prior to 2009, but in the last few years we’ve seen an explosion of new offerings (you can see,for example, the “NoSQL landscape” in a previous post I made). Generally speaking, and everything here is a wild generalization, since not all solutions are created equal and there are many types of solutions – NoSQL solutions mostly means some relaxation of ACID constraints, and, as the name implies, the removal of the “Structured Query Language” (SQL) both as a data definition language, and more importantly, as a data manipulation language, in particular SQL’s query capabilities.
ACID and SQL are a lot to lose and NoSQL solutions offer a few benefits to augment them mainly:
- Scalability – either as relative scalability, meaning scale cheaper than a comparable RDBMS at same scale point; or absolutely – as in scale better than an RDBMS can. Scalability is usually achieved by preferring partition tolerance over consistency in Eric Brewer’s CAP theorem and relying on “eventual consistency” (more on this later)
- Simpler models – i.e. the mapping of programming structures to storage structure is straight forward and thus avoid the whole “object/relations mapping quagmire” (or as Ted Neward called it “Vietnam of computer science” ). I have to say that in my experience this is only a half truth as it only holds to a point and when you need to scale and/or have high-performance requirements you need to carefully design your schemas and it isn’t always “simple”.
- Late binding schemas – This is a real flexibility boon as you can store data in forms that are close to the origin form and apply the schemas on read so you can deliver poly-strctured data and handle semi-structured data easily.
Eventual consistency and simple query mechanisms can work for a while and some use cases but as adoption of NoSQL solutions got more widespread we can see that markets needs more.
Eventual consistency means that if new updates stop flowing in after a while all reads will return the last updated value – As new updates rarely stops and as “after a while” is not well defined – this is a rather weak guarantee and we see some efforts to make stronger guarantees. Peter Bailis and Ali Ghodsi, published a good paper called “Eventual Consistency Today: Limitations, Extensions, and Beyond” where they go over some of the options. The NoSQL landscape is too wide to say this is happens everywhere but some solutions move in this direction, for example, in HBase (the NoSQL I’ve used most in the past few years) we’ve seen the addition of “Multi-Version Concurrency Control” which provide ACID guarantees for single row operations (which can be tuned down for performance)
Nevertheless, providing real guarantees under real conditions can prove to be rather tricky. I highly recommend reading Kyle Kingsbury series of great posts on Jepsen where he looks at how Postgres, MongoDB, Redis and Riak handle writes under network partitioning.
When we look at the NoSQL space we see that a lot of the technologies get better, more advance query languages e.g. mongoDB find some nice features ; cassandra’s query language is at its third version but one technology where introducing queries in general and SQL specifically is becoming form a trend into a stampede is Hadoop. Hadoop has a multi-vendor, multi-distro ecosystem (not unlike Linux) and it seems each and everyone of them wants to introduce its own SQL solution : Cloudera offers Impala, Hortonworks is working on Stinger initiative to enhance Hive, Pivotal (nee EMC greenplum) has Hawq , IBM is working on BigSQL and even SalesForce.com (which does not offer a distro) offers an SQL skin for HBase called Phoenix . The last Hadoop summit had a panel where some of these players debated the merits of their respective platforms which is worth listening to
The examples I’ve given above are mainly around hadoop – naturally, as this is the environment I’ve been working with I am more familiar with it, but more importantly it seems Hadoop has managed to place itself as the main NoSQL, large scale (a.k.a. big data) solution and as such this reSQL trend is more apparent there and it will (and it does) also affect other NoSQL offerings.
The thing is that NoSQL dropped SQL capabilities for simplicity – wider adoption draws all the capabilities and complexity back,I guess the main problem is that the situation is even more complicated when we’re also dealing with big data and its implications (e.g. late binding schema vs. the schema needs for the *structured* query language; immovable or hard to move data vs joins etc.)
“We are pleased to announce that a book published by Manning, SOA Patterns, by Arnon Rotem-Gal-Oz, has been selected for Intel Corporation’s Recommended Reading List for 2H’13. Congratulations!
Our Recommended Reading Program partners with publishers worldwide to provide technical professionals a simple and handy reference list of what to read to stay abreast of new technologies. Dozens of industry technologists, corporate fellows, and engineers have helped by suggesting books and reviewing the list. This is the most comprehensive reading list available for professional computer developers and IT professionals.”
In the last few years, we see the advent of highly distributed systems. Systems that have clusters with lots of servers are no longer the sole realm of the googles’ and facebooks’ of the world and we begin to see multi-node and big data systems in enterprises. e.g. I don’t think a company such as Nice (the company I work for) would release an hadoop based analytics platform and solutions, something we did just last week, 5-6 years ago.
So now that large(r) clusters are more prevalent, I thought it would be a good time to reflect on the fallacies of distributed computing and how/if they are relevant; should they be changed.
If you don’t know about the fallacies you can see the list and read the article I wrote about them at the link mentioned above. In a few words I’d just say that these are statement, originally drafted by Peter Deutsch, Tom Lyon and others in in 1991-2, about failed assumptions we are tempted to make when working on distributed systems which turn out as fallacies and cost us dearly.
So the fallacies help keep in mind that distributed systems are different, and they do seem to hold, even after the 20 years that passed. I think, however, that working with larger cluster we should also consider the following 3 as fallacies we’re likely to assume
- Instances are free
- Instances have identities
- Map/Reduce is a panacea
Instances are free
A lot of the new technologies of the big-data and noSQL era bring with them the promise of massive scalability. If you see a performance problem, you can just (a famous lullaby word) add another server. In most cases that is even true, you can indeed add more servers and get better performance. What these technologies don’t tell you is that instances have costs. More instances mean increased TCO starting from management effort monitoring, configuring etc, as well as operations cost either for the hardware; the rented space and electricity in a hosted solution or the usage by hours in a cloud environment. So from the development side of the fence the solution is easy – add more hardware. In reality sometimes it is better to make the effort and optimize your code/design. Just the other week we had a more than a 10 fold improvement in query performance by removing query parts that were no longer needed after a change in the data flow of the system – that was way cheaper than adding 2-3 more nodes to achieve the same results.
Instances have identities
I remember, sometime in Jurassic age, when I set up a network for the first time (A Novell Netware 3.11 if you must ask) it had just one server. Naturally that server was treated with a lot of respect. It had a a printer connected, it had a name, nobody could touch it but me. One server to rule all them clients. Moving on I had server farms, so just a list of random names began to be a problem so we started to use themes like gods, single malts (“can you reboot the Macallan please”) etc. Anyway, that’s all nice and dandy and if you are starting small with a (potentially) big data project you might be tempted to do something similar. If you are tempted – don’t. When you have tens of servers (and naturally even worst when you have hundreds or thousands) you no longer care about the individual server. You want to look at the world as pools of server types. you have a pool of data nodes in your hadoop cluster, a pool of application servers , a pool of servers running configuration x and another with configuration y. You’d need tools like abiquo and/or chef and/or ansible or similar products to manage this mess. But again, you won’t care much about XYZ2011 server and even it runs tomcat today, tomorrow it may make more sense to make it part of the cassandra cluster. What matters are the roles in the pools of resources and that the pool sizes will be enough to handle the capacity needed.
Map/Reduce is a panacea
Hadoop seems to be the VHS of large clusters. It might not be the ultimate solution, but it does seem to be the one that gets the most traction – a lot of vendors old (like IBM, Microsoft, Oracle etc.) and new (Hortonworks, Cloudera, Pivotal etc.) offer Hadoop distros and many other solutions offer Hadoop adaptors (Mongodb, Casandra, Vertica etc.) and Hadoop, well hadoop is about the distributed file system and, well, map/reduce.
Map/Reduce, which was introduced in 2004 by Google is an efficient algorithm for going over a large distributed data set without moving the data (map) and then producing aggregated or merged of results (reduce). Map/Reduce is great and it is a very useful paradigm applicable for a large set of problems.
However it shouldn’t be the only tool in your tool set as map/reduce is inefficient when there’s a need to do multiple iterations on the data (e.g. grpah processing) or when you have to do many incremental updates to the data but don’t need to touch all of it. Also there’s the matter of ad-hoc reports (which I’ll probably blog about separately) Google solved these in pregel, percolator and dremel in 2009/2010 and now the rest of the world is playing catchup as it did with map/reduce a few year ago – but even if the solutions are not mature yet, you should keep in mind that they are coming
Instances are free; Instances have identities; and map/reduce is a panacea – these are my suggested additions to the fallacies of distributed computing when talking about large clusters. I’d be happy to hear what you think and/or if there are other things to keep in mind that I’ve missed
I just got a notice from Manning that my book SOA patterns will be featured as “deal of the day” on Apr 14th – that means that it will be available for 50% off starting Midnight US ET of April 14th (and considering it’s a world-wide offer it would actually last for more than 24 hours).
To get the 50% discount use code dotd0414au at www.manning.com/rotem
If you’re not familiar with my book (which I guess is unlikely if you’re reading my blog, but anyway), you might want to check out the SOA Patterns page on my site, read one or more of the pattern draft or check out the book reviews.
Reviews of SOA patterns
- Cameron McKenzie @ TheServerSide.com
- Tad Anderson @ Java Developers Journal
- Roberto Casadei @ robertocasadei.it
- Colin Jack @ losTechies (half a book review)
- Jan Van Ryswyck @ ElegantCode.com (half a book review)
- Karsten Strøbæk @ strobaek.org
- Reviews on Amazon
Even though I mostly sit at work trying to look busy, every so often someone does stumbles into my office with a question or a problem so I’ve got to do something.
Interestingly enough, a lot of problems can be handled by some pretty basic stuff like like reminding people that a .jar/war file is a zip file and you can take a look inside for what’s there or what’s missing; or sending people to read the log files (turns out these buggers actually contain useful information) etc. – so now for today’s lesson: “It’s open source, so the source, you know, is open…”
We use a lot of open source projects at Nice (we’ve also, slowly, starting to give something back to the community but that’s another story). One of these is HBase, one of our devs was working on enabling and testing compression on HBase. looking at the HBaseAdmin API (actually the column descriptor) he saw there was the option for setting the compression of a column family and an option for setting compression of compaction. The question he came with was do I know how it behaves when you set one and not the other and how they work together.
Well I know about HBase compression but I didn’t hear about compaction compression and the documentation on this is, well, lacking. Luckily HBase is an open source project, so I took a peek. I started with hfile.java which reads and writes HBase data to hadoop. well, it seems that the writer gets a compression algorithm as a parameter and that the reader gets the compression algorithm from the header. so essentially different hfiles can have different compressions and HBase will not care. We start to see the picture but to be sure we need to see where the compression is set. So we look in the regionserver’s Store.java file and we see :
Bottom line reading through HBase code I was able to understand exactly how the feature in question behaves and also get a better understanding of the internal workings of HBase (HFile descibe their own structure so different files can have different attributes like compression etc.)
Another example for how reading code can help is using Yammer’s monitoring library metrics. Building the monitoring solution for our platform we also collect JMX counters (like everybody else I guess :) ).So I stumbled upon metrics and the manual did a good job of showing the different features and explaining why this is an interesting library. I asked one of our architects to POC it and see if it is a good fit. He tried but it so happens that it is rather hard to understand how to put everything together and actually use it just from the documentation. Luckily metrics code has unit tests (not all of it by the way, which is a shame, but at least enough of it) e.g. the following (taken from here) that shows how to instrument a jersey service:
Again, we see that having the code available is a great benefit. You don’t have to rely on documentation being complete (something we all do so well, but those other people writing code don’t so, you know..) or hoping for a good samaritan to help you on stack overflow or some other forum. and that’s just from reading the code… imagine what you could do if you could actually offer fixes to problems you encounter but, oh wait, you can…
Ok, I think I’ve done enough for today, got to get back to trying to look busy
illustration licensed under creative commons attribution 2.5 by opensource.org
the past week or so we got some new data that we had to process quickly . There are quite a few technologies out there to quickly churn map/reduce jobs on Hadoop (Cascading, Hive, Crunch, Jaql to name a few of many) , my personal favorite is Apache Pig. I find that the imperative nature of pig makes it relatively easy to understand what’s going on and where the data is going and that it produces efficient enough map/reduces. On the down side pig lacks control structures so working with pig also mean you need to extend it with user defined functions (UDFs) or Hadoop streaming. Usually I use Java or Scala for writing UDFs but it is always nice to try something new so we decided to checkout some other technologies – namely perl and python. This post highlights some of the pitfalls we met and how to work around them.
Yuval, who was working with me on this mini-project likes perl (to each his own, I suppose) so we started with that. searching for pig and perl examples, we found something like the following
A = LOAD 'data'; B = STREAM A THROUGH `stream.pl`;
The first pitfall here is that the perl script name is surrounded by a backtick (the character on the tilde (~) key) and not a single quote (so in the script above ‘data’ is surrounded by single quotes and `stream.pl` is surrounded by backticks ).
The second pitfall was that the code above works nicely when you use pig in local mode (pig -x local) but it failed when we tried to run it on the cluster. It took some head scratching and some trial and error but eventually Yuval came with the following:
1 2 3
DEFINE CMD `perl stream.pl` ship ('/PATH/stream.pl'); A = LOAD 'data' B = STREAM A THROUGH CMD;
Basically we’re telling pig to copy the pig script to HDFS so that it would be accessible on all the nodes.
So, perl worked pretty well, but since we’re using Hadoop Streaming and get the data via stdin we lose all the context of the data that pig knows. We also need to emulate the textual representations of bags and tuples so the returned data will be available to pig for further work. This is all workable but not fun to work with (in my opinion anyway).
I decided to write pig UDFs in python. python can be used with Apache streaming, like perl above, but it also integrates more tightly with Pig via jython (i.e the python UDF is compiled into java and ships to the cluster as part of the jar pig generates for the map/reduce anyway).
Pig UDFs are better than streaming as you get Pig’s schema for the parameters and you can tell Pig the schema you return for your output. UDFs in python are especially nice as the code is almost 100% regular python and Pig does the mapping for you (for instance a bag of tuples in pig is translated to a list of tuples in python etc.). Actually the only difference is that if you want Pig to know about the data types you return from the python code you need to annotate the method with @outputSchema e.g. a simple UDF that gets the month as an int from a date string in the format YYYY-MM-DD HH:MM:SS
1 2 3 4 5 6 7 8 9 10 11
@outputSchema("num:int") def getMonth(strDate): try: dt, _, _ = strDate.partition(".") return datetime.strptime(dt, "%Y-%m-%d %H:%M:%S").month except AttributeError: return 0 except IndexError: return 0 except ValueError: return 0
Using the PDF is as simple as declaring the python file where the UDF is defined. Assuming our UDF is ina a file called utils.py, it would be declared as follows:
Register utils.py using jython as utils;
And then using that UDF would go something like:
A = LOAD 'data' using PigStorage('|') as (dateString:chararray); B = FOREACH A GENERATE utils.getMonth(dateString) as month;
Again, like in the perl case there are a few pitfalls here. for one the python script and the pig script need to be in the same directory (relative paths only work in in the local mode). The more annoying pitfall hit me when I wanted to import some python libs (e.g. datetime in the example which is imported using “from datetime import datetime”). There was no way I could come up with to make this work. The solution I did come up with eventually was to take a jyhton standalone .jar (a jar with a the common python libraries included) and replace Pig’s jython Jar (in the pig lib directory) with the stanalone one. There’s probably a nicer way to do this (and I’d be happy to hear about it) but this worked for me. It only has to be done on the machine where you run the pig script as the python code gets compiled and shipped to the cluster as part of the jar file Pig generates anyway.
It has been few months since SOA Patterns was published and so far the book sold somewhere between 2K-3K copies which I guess is not bad for an unknown author – so first off, thanks to all of you who bought a copy (by the way, if you found the book useful I’d be grateful if you could also rate it on Amazon so that others would know about it too)
I know at least a few of you actually read the book as from time to time I get questions about it :). Not all the questions are interesting to “the general public” but some are. One interesting question I got is about the so called “Canonical schema pattern“. I have a post in the making (for too long now,sorry about that Bill) that explains why I don’t consider it a pattern and why I think it verges on being an anti-pattern. Another question I got more recently, which is also the subject of this post, was about the Saga pattern.
Here is (most of) the email I got from Ashic :
“Garcia-Molina’s paper focuses on failure management and compensation so as to prevent partial success. It discusses a variety of approaches – with an SEC, with application code outside of the database, backward-forward and even forward-only (the latter having no “compensate” step per activity, rather a forward flow that takes care of the partial success). Nowadays, I see two viewpoints regarding sagas:
1. People calling process managers sagas, which is obviously incorrect. [e.g. NServiceBus "sagas".]
2. People focusing very strongly on a “context” of work, whereby the context gets passed around from activity to activity. For linear up front workflows, routing slips are an easy solution. An example of this can be found at Clemens’s post here: http://vasters.com/clemensv/2012/09/01/Sagas.aspx . For more complicated workflows, graph-like slips may be used.
After discussing with some enthusiasts, they seem very keen to suggest that the context has to move along. They seem to reject the notion of a saga where a central coordinator controls the process. In other words, even if a process manager takes care of only routing messages, and that routing includes compensations to alleviate partial successes, they are unwilling (sometimes vehemently) to call that a saga. They acknowledge it can be useful, but say that is not a saga. I find this to be confusing. In this case the process manager acts as the SEC would in a Garcia-Molina saga capable database. This approach still allows interleaved transactions (or steps) without a global lock. Why would this not be a saga?
In your book, I did see you mentioned orchestration as a way of implementing sagas. However, when this was brought up, the proponents of point 2 suggest that that is not what you really mean. To me it seems quite clear, and it aligns with Hector’s paper. I just want to make sure I have this right. I’d love your thoughts on this.”
Let’s start with the answer to the question:
When I think about the Saga pattern I see it as the application of the notions in the Garcia-Molina paper (which talked about databases) to SOA. In other words, I see sagas as the notion of getting distributed agreement of a process with reduced guarantees (vs. distributed transactions that propose ACID guarantees across systems). – So,basically, a Saga is loose transaction-like flow where, in case of failures, involved services perform compensation steps (which may be nothing, a complete undo or something else entirely). The Saga pattern can augment this process with temporary promises (which I call reservations).
Under this definition both centrally managed processes and a “choreographed” processes are Sagas – as long as the semantics and intent mentioned above are kept. The centrally managed orchestration provides visibility of processes, ease of management etc; The cooperative event based, context shared sagas provide flexibility and allow serendipity of new processes; Both have merit and both have a place, at least in my opinion :)
The main reason both of these, very different, approaches are valid designs and implementations for the Saga pattern is that the Saga pattern (like others in the book) is an Architectural pattern and not a Design pattern. Which brings us to the second reason for this post, the difference between “Architecture” and “Design”. In a nutshell, architecture is a type of design where the focus is quality attributes and wide(er) scope whereas design focuses on functional requirements and more localized concerns.
The Saga pattern is an architectural pattern that focused on the integrity reliability quality attributes and it pertains to the communication patterns between services. When it comes to design the implementation of the pattern. you need to decide how to implement the concerns and roles defined in the pattern -e.g. controlling the flow and the status of the saga. One decision can be to implement it centrally and use orchestration another decision can be to decentralize it and use context…
Design decision can be very meaningful sometimes it can be hard to find what’s left of the architecture – consider for example the whole idea behind blogging and RSS feeds. The architectural notion is a publish/subscribe system where the blog writer publish an “event” (a new post) and subscribers get a copy. When it came to design and implementation, considering it was implemented on top of HTTP and REST where there is no publish/subscribe capability it was actually designed as a pull system where the publisher provides a list of recent changes (the feed) and subscribers sample it and check if anything changed since the last time. So architecturally pub/sub, design pull a centralized server that exposes latest changes – a really big difference
Does it matter at all? I think yes. Architecture lets us think about the system at a higher level of abstraction and thus tackle more complex systems. When we design and focus on more local issues we can tackle the nitty gritty details and make sure things actually work. we need to check the effects of design on architecture and vice versa to make sure the whole thing sticks together and actually does what we want/need.
Note that architecture and design are not the complete story – another variable is the technology (e.g. HTTP in the example above) which affects the design decision and thus also the architecture (you can read a little more about it in my posts on SAF)
illustration by Mads Boedker
One of our team leaders approached me in the hall today and asked if I could land a hand in troubleshooting something. He and our QA lead were configuring one of our test Hadoop clusters after an upgrade and they had a problem with one table they were trying to set up:
- When they tried to create the table in HBase shell they got an error that the table exists
- When they tried to delete the table they got an error that the table does not exist
- HBase ships with a health-check and fix util called hbck (use: hbase hbck to run. see here for details) – they’ve run hbase reports everything is fine and dandy
Hmm, The first thing I tied to do is to look at the .META. table. This is where HBase keeps the tables and the regions they use. I thought maybe there was some just there. but it didn’t look like that. I tried to do a major compaction for it and that didn’t help either.
The next thing I tried actually found the problem. I ran the Zookeeper client (I used hbase zkcli but you can also run it via zookeeper scripts) and looked at /hbase/table (ls /hbase table) -the zombie table was listed right there with all the legit tables. HBase stores some data schema and state of each table in zookeeper to be able to coordinate between all the regionservers and it seems that during the upgrade process the system was restarted a few times. One of these restarts coincided with a removal of the table and caught it in the middle.
Ok, so that is the problem – what’s the solution? Simple just remove the offending znode from zookeeper (rmr /hbase/table/TABLE_NAME ) and restart the cluster (since the data is cached in the regionservers/hbase master to save trips to zookeeper). Also be careful not to remove any other node or you’d cause problems to other tables.
The role of ZooKeeper in HBase is not documented very well. The only online account of ZooKeeper’s role with HBase I found (save looking at the code itself of course) is really outdated. Hopefully this post will save some head scratching and time for others who find themselves with the same problem.
illustration by jamesrdoe
In the last year and half or so (since I joined Nice Systems ) we’ve been hard at work building our big data platform based on a lot of open source technologies including Hadoop and HBase and quite a few others. Building on open source brings a lot of benefits and helps cut development time by building on the knowledge and effort of other.
I personally think that this has to be two-way street and as a company benefits for open source it should also give something back. This is why I am very happy to introduce Nice’s first (hopefully first of many) contribution back to the open source community. A UI dev tool for working with HBase called h-rider. H-rider offers a convenient user interface to poke around data stored in HBase which our developers find very useful both for development and debugging
h-rider is developer and maintained by Igor Cher, one of the best developers
Here is some more blurb from the hrider git wiki:
What is h-rider?
The h-rider is a UI application created to provide an easier way to view or manipulate the data saved in the distributed database - HBase™ - that supports structured data storage for large tables.
To get started, begin here:
- Learn about h-rider by reading the manual.
- Download h-rider from the release page (or git clone from the repo)
I was poking around my old blog (rgoarchitects.com) and I found this post from 2007 which I think is worth re-iterating:
In a post called “Ignorance vs. Negligence“, Ayende blows some steam off on some of the so called “professionals” that he met along the way. You know …those with a fancy title that don’t know jack and design some of the nightmares we see from time to time. I’ve seen this phenomena in a lot of projects I consulted/reviewed:
- The senior security expert who recommended something which isn’t supported by the platform
- The senior architect who throw the system down to hell by basing all the system on a clunky asynchronous solutions that should only be used by a tiny portion of the application.
- The geniuses that built this wonderful code generator that generated code with so many dependencies and singletons that made the solution unusable
- The chief architect that created this wonderful performance hog, and then kept poking around to make sure we don’t fix it too much.
- The architect that partitioned a distributed solution based on functions – so that each and every business process has visit go through all the tiers and components. The solution made the everything more complicated by few orders of magnitude (scale, synchronization, availability, performance what not)
- The architect that designed his own distributed transaction mechanism (basically duplicating COM+) – naturally with less than satisfactory results…
“They all have a few things in common, they represent themselves as experts, senior, knowledgeable people. In all those cases, they have actively acted to harm the business they were working for, by action, inaction or misaction
I have no issue with people not knowing any better, but I do expect people that ought to know better to… actually do know better.”
I don’t think that this is negligence involved here- I think all of these people wanted to do the right thing, they probably believed they were right. They were probably also pretty good at their past jobs that led them to the current position.
What they didn’t learn is to “know that they don’t know”. This is a hard lesson to learn. I
think hope I learned my lesson after the first time I tried to distribute a (naive) solution I was so proud of. Well, at least, I stayed around for enough time to both see the results and learn how to fix the problem.
I think not staying around for enough time is one of reasons for this ignorance – since on the onset things usually look good enough and seem to work. If by the time the problem rears its ugly head you’ve already moved on to a new shiny job, you’d missed the opportunity to learn form your mistakes.
Another cause for ignorance is not looking around and learning only from your experience. For instance, I am now interviewing a lot of people, and when I ask a question like “tell me about something interesting you recently read- a book, an article, a blog anything” – I usually get blank stares. Few people tell me about an article they read that is related to a problem they had, and fewer still tell me about something without a direct relation to their work. If you don’t look beyond the keyboard you will never know better. learning only from your mistakes can be problematic – especially if we also consider the previous point (people don’t stay around)
Ignorance is bliss they say, maybe so – but I think ignorance has a lot to do with the crappy systems we see all around us and its one of the reasons writing software stays more of an art than a science or craft.
illustration by parl
Here’s the NoSQL landscape in 3 slides (and hey, at least mine looks different :) )
451 research published their view of the NoSql/NewSql world in a unified diagram.
And here’s mine from SOA Patterns chapter 10 (discussing “SOA & big data”)