Solving Stability Problems in Hadoop Cluster — Big Data with Small Data

Eyal Yoli Abs
DataDrivenInvestor
Published in
7 min readNov 7, 2019

--

Fish in the lake. Image by Quang Nguyen Vinh.

I am currently a team leader of CyberLake big data team.

Initially, the project started with “small data” from only one input source (the same goes right, probably, for any new project). Eventually, when we deployed to production, we had four input sources.

Later on, there will be an enormous addition in our source and scaling problems will arise…

The story behind

In the beginning, the team struggled with performance issues while setting up the infrastructure (Flafka, Hadoop and so on) to ingest and store the sources. Furthermore, they did a great job learning the basics to overcome these hurdles and make the best out of the current servers.

When I took responsibility as the team leader, we had about nine streaming sources and one batch input, ingested, parsed and stored using Parquet format in HDFS.

We didn’t have any scaling problems since the last performance issues have been solved. There was one massive source that we couldn’t scale for, and we ingested it at that time using a simple Scala application that scaled better with some compromises (anyhow, this is a story for another time).

We had a wave of new data coming at us while blindfolded with the numbers and throughput of every source, we didn’t even know how many sources are going to be connected. The newly connected sources were the responsibility of a 3rd party organization that we didn’t have any direct contact with (a middleman team, in the same organization as ours, were the POC for this operation).

New data sources were coming unexpectedly day-to-day, and all I thought was, “We can ingest them ALL! [inner devil]”. You may laugh at me now but ultimately, I was right!

At that time, our mission was to ingest this new data ASAP, having said that, ingestion of data was what we called “raw ingest”. Getting the data and storing it in plain text (a temporary stage until we deployed a suitable parser).

To parse the data and transform it into Parquet format, we used Apache Spark. Although we had a little experience with Spark, in the overall, we had parsers running in Spark streaming and others as Spark batch operations.

At some point, we had about 23 sources connected (10 parsed from before and 13 new raw sources). A temporary path was set up for all new raw data, separated from parsed data.

Generally, all our paths in HDFS are indexed using date/time format in hours per source (leaf directory indicates an hour of the day and so on).

The architecture is simple and it is illustrated in the diagram below:

General architecture of our project.

The action begins

Learning the basics is sufficient for first use cases. However, what wasn’t covered in the basics is how to scale. When we tried to ingest another big source (similar to the one before), we started to have stability issues that began in Flume and ended up with HDFS.

What is hard about scaling problems is to detect them. You won’t get a lovely error in the log saying, “ERROR: Scaling problem just occurred! Please fix this ASAP”. You will just come across some weird phenomena.

About a month before the solution, we started to get unexplained falls of Flume services. We had three Flume instances running for high availability (HA), and “randomly,” one of these instances fell dead down for a couple of minutes, and afterward was restarted by the deployed monitoring agents on the servers. These falls were monitored but without any notification (since they were classified as warnings). One of my teammates, unintentionally, saw this problem while reviewing the monitoring history.

It was very alarming when we saw the same falls occur nearly every day or two. I asked the same teammate to check out the reason behind these falls, but all we got were error logs about out of memory exceptions and unexpected exits. These errors are ambiguous and are hard to follow.

After a day of investigations, we couldn’t find any lead for the solution. Besides that, no one from the management nor the customers bothered; “As long as the data is intact then it is working OK…”

The demanding on-going tasks caused us to patch the problem by enlarging the RAM of the JVM, forget about it and continue with our lives.

The temporary fix gave us some peacetime, but it is in these times that you miss something important. Such as that we had a limited HA; not all sources are sending to all three agents (some technical limitations that we had and overcome in another story).

One day, one of our users came across a hole in our data! He asked us to figure out what happened. We checked thoroughly and found that Flume’s previous restarts now became downtime for almost half an hour, besides, the falls were more frequent and in more than just one instance at a time.

Getting to the problem

In such chaotic circumstances, be sure that everybody is now going to be aware of the issue, even worse, will start to stress everyone for a solution.

I will not go-on step by step solution for this specific problem, because all stability problems seem similar but they are all unique to the architecture and the configuration you have.

The hardest part was to understand the logs and to focus your attention on the root problem, not its symptoms. When you read logs containing “out of memory exception,” “HFDS I/O flush error” and loads of “time outs,” you will feel lost. Quickly you start to doubt anything you know about Flume, Kafka, HDFS, even your code.

The answer to such situations is to focus on the story! The logs are just trying to tell a story, and when reading every log on its own you’ll miss out on the bigger picture (story). Reading one line, or even a sequence of lines, from the log to understand the problem, works great when you are operating standard non-distributed applications. Nevertheless, it will not serve you anymore. Going through thousands of lines of logs from multiple places to connect the story seems unreasonable at the beginning, but you’ll get to it without any other alternatives.

The diagram below explains the story of the problem:

An illustration trying to explain the problem.

So all the logs that we went through were just the symptoms of the problem. The root issue was indicated in HDFS logs as a WARNING! The log said, “Detected pause in JVM or host machine (eg GC): pause of approximately 52372ms blah blah blah.”

Well, it seems that warnings (even parentheses) are a crucial part of this story! They fill in the missing puzzle pieces.

The JVM GC of HDFS took too much time to do its work. When GC is working, you can’t contact the HDFS and it stops responding. Here it stopped for an average of 37.8 seconds in 53 pauses daily (especially at peak times) for every DataNode. I’m sure that is not a good sign (imagine you have hundreds of connections from Flume in one minute and half of them fail at every pause).

Why did this happen? And how did we get there? This is just what happens when you forget about the basic assumption of your software. Our scaling problems started when we forgot that HDFS is meant for storing large files. And this is were we failed our HDFS; we stored raw sources directly, which meant 8KB-2MB of thousands of files were stored, which meant for almost every file we had a block in HDFS’s heap, which meant we have a very large heap, which meant heap is full and GC is having a hard time, which meant HDFS is not responsive, which meant Flume’s throughput to HDFS is low compared to sources’ throughput to Flume, which meant Flume is having out of memory and thus falls and restarts.

The full resolution is too much for this article, so I’ll explain it in later stories.

Summing up the experience

It took us up to two weeks to get back to a normal healthy cluster (with a loss of too much data). These were two hard weeks. We felt very responsible for the problem but we couldn’t grasp it. We are, usually, a very valued team in the organization. We started this project from scratch and without any previous big data knowledge. Having this problem suddenly messing up our name was pretty shocking to all of us, including the management and our clients. The rock-solid all-mighty cluster that we have is very sensitive, and when instability hits, we, all, will have a hard time.

Consequently, the thing that got us back to ground, and our clients with us, was making sure that my team understood every puzzle we had and why it occurred (never categorize anything as dark magic), in addition, I gave my teammates a periodic time (sometimes as a part of a task, other times as a free time to explore) to relearn the basics and dive deep down, to check out and learn new features, even to understand the source code of some of the products that we use (when we didn’t find better information source)…

A great thing we came upon was checking out the internals of the critical products we have (just search anything with the word internals). We started sharing new stuff that we learned so everyone could see how mentally strong we are and feel our great sense of duty and responsibility.

Don’t be afraid to expand your knowledge beyond user manuals; this is when you are out-of-the-box!

--

--