March 21, 2014

Tame the Machine Learning Beast With Apache Mahout

What is Apache Mahout

Mahout is a Hindi word that refers to an elephant driver and it should be pronounced so that it rhymes with trout. The symbolism of the chosen name for the library should be obvious since the main intent is to provide scalable machine learning capabilities over Hadoop (whose mascot is an elephant).

So Mahout is an open source (Apache license) machine learning and collective intelligence library built with the main intent of scalability in mind, making use of Hadoop (map-reduce jobs) for some of its implementations. It can be used also as a standalone or embedded in the client code library in order to solve problems falling into one of the main categories covered by Mahout: recommendation engines, clustering and classification.

Mahout is currently (2014/03/21) at version 0.9,  going fast towards the fully mature 1.0 version and some of the newest features include: support for scala, recommenders using search technologies and neural networks classifier implementation (MLP).

When to use Mahout

Two major questions need to be answered before considering Mahout as the right tool for the problem at hand – what type of problem are you trying to solve and how big is the data.

For the first question, while the list of implemented machine learning algorithms grows each day –, the answer should fall into one of the previously mentioned categories – recommendation, clustering, classification. It is also important to note that machine learning techniques, depending on the algorithm, won’t be able to answer with 100% precision to the question at hand instead being more of a “most likely” type of solution (e.g. some classifiers won’t be able to go beyond 85% accuracy no matter how much training they get).

Second, since the main goal for the Mahout library is to be scalable, it is really important to look closely at the data that will need to be processed. If we are not talking about tens of millions of items, the overhead that Mahout brings in order to be scalable will most likely work against you. Mahout usually shines where other machine learning libraries don’t stand a chance in the face of gigantic amounts of input data. And if you are wondering how big the data should be in order to be big data, maybe some examples could help – Google news works with more than 3.5 million articles from more than 4500 sources daily; the famous Netflix prize [] contained 100 million ratings.

If the two questions don’t point you to Mahout, maybe it’s a good idea to take a look at some of the existing alternatives like Spark MLlib or Weka library

Another important aspect is the fact that Mahout is written in Java and consequently it integrates really well with Java applications. However, that should not be a showstopper for application written on other platforms, since there were success stories that involved Mahout and other languages (Ruby for

Some of the use cases where Mahout was used successfully and empowered the business can be found here:

Input Data

One last note before diving into the 3 main categories of algorithms: For the vast majority of machine learning problems the most crucial role in the whole solution is the attention paid to the input data. The famous “garbage in, garbage out” saying applies here more than ever. What data do you choose? What is relevant? How do you avoid corrupted input? How do you prepare it for the specific algorithms? All these questions have a crucial role in the success of the implemented solution, and in most machine learning problems most of the effort is channeled in this first phase of the entire flow.

Recommendations Engine

If you ever visited a shopping site like Amazon or O’Reilly you might have noticed, when you look at an item (e.g. a book), something along the lines “you might also be interested in these other items (books): …. ”. For the book Mahout in Action on Amazon ( I get books like “Machine Learning in Action,” “Programming Pig,” “Programming Hive,” etc. If you’ve never seen this before, you might ask yourself “what kind of sorcery is this? how did they knew that I would be interested in those Hadoop books?” and the answer is a good recommendations engine.

Recommendation engines are widely used from shopping sites to applications like Google Maps in order to provide useful contextual information and options to the user. Their job, in a nutshell, is to select and rank the best of many possible alternatives. There are multiple types of recommenders out there (baseline predictor, item-item or user-user collaborative filtering, content-based recommender, dimensionality reduction, interactive critique-based recommenders) and only a subset are available in Mahout, but for most use-cases the user-based recommender will do the job. I’ll go through a simple example where we can use such a recommender and how to make use of Mahout in order to achieve our goal.

The problem: Having a bookstore business, how can we recommend a new book to an existing user?

As already mentioned, most of the effort required to build a good recommendation system is put into getting the right data to the recommendation engine in the first place. In our toy example this is easy because we consider the existing (input) data already formatted in the following way:


The 3 columns on each line represent the user id (e.g. 1,2,3), the book id that the user expressed a preference for (e.g 103, 101), and the preference score (0-5). For example, user with id 1 expressed a strong preference of 5 for the book with id 101. So, having this input data, what can we recommend to the user with id 1? Intuition tells us that since user 1 had a preference for book 101 and users 4 and 5 also had a preference for the same book, we might look at other books that users 4 and 5 liked but user 1 never saw before (so maybe one of the 104, 105, 106 books).

Apart from the mentioned classes and implemented algorithms, Mahout also provides the tools to run a recommender engine on a Hadoop cluster, meaning a Hadoop recommender job (the steps to run such a job are provided later on). The provided Hadoop job drives on the idea of co-occurrence matrix for items being recommended and adapts the way the recommendations are computed using this matrix to a map-reduce approach in order to be scalable.

Recommendations engine can be cleverly combined with clustering and classification algorithms or even Apache Solr in order to produce smarter, more relevant and faster results.


Clustering in an unsupervised learning algorithm that can find structure in your data by grouping similar looking pieces of data into a set or cluster. At the core of such an approach is, as you might have guessed, the similarity or more specific, the way we define similarity. This translates to what features or attributes we choose as being representative for our elements and what is the metric we use to compare our elements.

As a simple example, if you have a bunch of books and you want to group them together in different piles, you might choose to group them by author (so one pile for each author) or by similar topic (so the computer science books go in one pile, while the biology ones go in another) or even by the color of their cover. You will end up with different piles depending on the chosen attributes of the books (author, topic, cover color) and metrics (same author, similar topic, same color).

As with recommenders, the quality and structure of the input data is of paramount importance. In order to prepare the input data for a clustering process, the following steps should be taken:

  1. Decide what features/attributes are relevant for your data elements and should drive the clustering process
  2. Vectorize the chosen features (vector is used here in the mathematical sense – think of it as a point in a multidimensional space, the number of dimensions being equal with the number of different features chosen per data element). This usually translates into assigning a numeric value on a scale for each of the attributes.
  3. Decide what metric is most appropriate for your data and attributes (there is a vast array to choose from or you can define your own)

This will turn your data into a more of a mathematical model, model the clustering algorithm can reason about. Let’s go through another example and see how Mahout can help us in this case.

Consider a set of points in a two-dimension space, where each point is defined by two coordinates x and y:  {1, 1}, {2, 1}, {1, 2}, {2, 2}, {3, 3}, {8, 8}, {9, 8}, {8, 9}, {9, 9}. If we are to plot these points on a x-y plane, two clusters are obvious – the first 5 points belonging to the first one and the last 4 to the second. This is a simple and obvious example where the input data is already vectorized for us and the metric stands out as the euclidian distance, but understanding this sample and seeing it work in Mahout is all that it takes to start working with clustering. And speaking of Mahout, let’s see how we implement this:

        //input data
        double[][] points = { {1, 1}, {2, 1}, {1, 2}, {2, 2}, {3, 3}, {8, 8},{9, 8}, {8, 9}, {9, 9} };

        //vectorize the input data and prepare the input files.
        List<Vector> vectors = new ArrayList<Vector>();
        for (int i = 0; i < points.length; i++) {
            double[] fr = points[i];
            Vector vec = new NamedVector(new RandomAccessSparseVector(fr.length), "point"+ i);
        SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path("clustering/testdata/points/file1"),
                LongWritable.class, VectorWritable.class);
        long recNum = 0;
        VectorWritable vec = new VectorWritable();
        for (Vector point : vectors) {
            writer.append(new LongWritable(recNum++), vec);

        //choose two random clusters to begin with
        Path path = new Path("clustering/testdata/clusters/part-00000");
        SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class);
        int k = 2;
        for (int i = 0; i < k; i++) {
            Vector vec = vectors.get(7 - i);
            Kluster cluster = new Kluster(vec, i, new EuclideanDistanceMeasure());
            writer.append(new Text(cluster.getIdentifier()), cluster);

        //run the k-means algorithm,
                new Path("clustering/testdata/points"),
                new Path("clustering/testdata/clusters"),
                new Path("clustering/output"),
                0.001, 10, true, 0.001, true);

        SequenceFile.Reader reader = new SequenceFile.Reader(fs,
                new Path("clustering/output/" + Cluster.CLUSTERED_POINTS_DIR + "/part-m-0"), conf);

        IntWritable key = new IntWritable();
        WeightedPropertyVectorWritable value = new WeightedPropertyVectorWritable();
        while (, value)) {
            System.out.println(value.toString() + " belongs to cluster " + key.toString());

This is a bit more code compared to what we’ve seen for a simple recommender, however, most of it is preparing data and reading the results. The actual clustering process takes place with the help of KMeansDriver, and that is just one line of code.

Running the code will output something along the lines:

wt: 0.8448555025571853 distance: 1.1313708498984762  vec: point0 = [1.000, 1.000] belongs to cluster 0
wt: 0.8568850271670079 distance: 0.8246211251235319  vec: point1 = [2.000, 1.000] belongs to cluster 0
wt: 0.8568850271670079 distance: 0.8246211251235319  vec: point2 = [1.000, 2.000] belongs to cluster 0
wt: 0.8882076772796631 distance: 0.2828427124746191  vec: point3 = [2.000, 2.000] belongs to cluster 0
wt: 0.7649671448099288 distance: 1.6970562748477138  vec: point4 = [3.000, 3.000] belongs to cluster 0
wt: 0.8512355175387429 distance: 0.7071067811865476  vec: point5 = [8.000, 8.000] belongs to cluster 1
wt: 0.8601727624507451 distance: 0.7071067811865476  vec: point6 = [9.000, 8.000] belongs to cluster 1
wt: 0.8601727624507451 distance: 0.7071067811865476  vec: point7 = [8.000, 9.000] belongs to cluster 1
wt: 0.8675577686515995 distance: 0.7071067811865476  vec: point8 = [9.000, 9.000] belongs to cluster 1

And that is the expected result, just as we anticipated before running the code while looking at the input data. The results provided by K-Means clustering coincide with our intuition because of the way we represented the input data, the chosen metric ( EuclideanDistanceMeasure ) and finally the chosen clustering algorithm along with all the configuration parameters. Had we chosen a different metric, the results could have been totally different. All in all, it is pretty amazing that with just a screen of code your application can find structure and meaning in the data. This feeling is even more intense when you are dealing with big amount of data where you can’t even foresee the actual outcome.

Mahout provides at least five different clustering algorithm (k-means, fuzzy k-means, Meanshift, Dirichlet and Latent Dirichlet) each with a wide array of configuration options, options that might prove tricky to get right for your data from the first run. It also provides a set of predefined metrics including Euclidean Distance, Squared Euclidean Distance, Manhattan Distance, Cosine Distance, Tanimoto Distance to name a few along with the possibility of implementing your own distance measure.

Choosing the right algorithm and configurations parameters along with the right distance measurement and the way to vectorize the input data won’t be an easy and obvious task each time but Mahout helps here as well by providing the means and tools to quickly iterate through this process of fine tuning.

And related to ways of vectorizing the input data, one widespread utilization for clustering is in the context of text documents, where in order to be able to cluster the data you’ll need to put in numbers how important it is a word for a certain document. The technique used here usually revolves around something called “term frequency – inverse document frequency” or TF-IDF, that boils down to counting the words and assigning them weights according to how frequent are across all documents. Mahout will be happy to help you here as well with implementations like DocumentProcessor, DictionaryVectorizer and TFIDFConverter.


Classification is the process of using specific information or input to choose a single selection or target from a short list of predetermined potential responses by using a trained model or previous experiences. Some examples of classification at work are spam detection (categorize this email either as spam or valid email), credit card fraud, predict customer attrition for a business. In all these cases the computer is trying to emulate certain forms of human decision making.

The classification process is a form of supervised learning method and maybe this is the main characteristic that will make it easy to avoid confusing them with clustering (clustering being unsupervised learning). What is meant by supervised in this context is the fact that in order to have a working classifier you first need to train it by providing correct examples. So in the case of spam/valid emails problem, before using the classifier you will need to provide a good set of emails clearly marked as spam or valid.

In order to have a working classifier, there is usually a three steps cycle that you’ll have to work your way through: train the model, evaluate and fine-tune the model, use it in production (and gather feedback and metrics) and repeat as long as necessary to get a good result. Of course, as with the other machine learning problems, the attention given to the input data will pay off handsomely here as well. However, in the case of classifiers most of the effort will be split between choosing the right way to vectorize the data, providing the correct training examples and tuning the classification parameters. In a production model it is not unusual to run the learning algorithm hundreds or thousands of times to find good values.

Mahout provides all the tools needed to have a working classification engine. It comes with an array of implementations to choose from like Logistic Regression (SGD), Naive and Complementary Bayes, Hidden Markov Models, Random Forests as well as some experiments in the area of Support Vector Machines and Neural Networks. Some of those algorithms are map-reduce ready (Bayes, HMM) while others are limited to sequential single machine training and running (SGD). It also provides the required jars and CLIs so that you can easily evaluate and fine-tune a classification model by calculating the AUC, confusion matrix, average log likelihood and even dissecting a trained model. However, before jumping on Mahout for every classification problem that you encounter, you will first need to determine the size of your data, just like for the other types of ML problems. As a rule of thumb, if the number of training examples doesn’t exceed 500.000 items, you are probably better off with something different than Mahout. The overhead of scaling algorithms that makes Mahout shine where others fail (e.g. more than 10 million training items) will just stand in your way for small data sets.

After solving the size problem you will need to ask another set of questions that would guide you to a good design for your classifier:

  1. What is the batch size for classification?
  2. What is the required response time for classification batches?
  3. How many classifications per second, in total, need to be done?
  4. What is the expected peak load for the system?

Through it all, it’s important to remember that building good classification models requires many cycles of iterative refinement. You need to make sure your training pipeline makes these iterations efficient. Also, in a production environment, in order to achieve high speed and 100% uptime, it is advisable to use a pool of classifiers, and use a coordination service (e.g. zookeeper) in order to correctly propagate the model updates.

As mentioned, great care needs to be taken when encoding the features. The are only a strict set of accepted types and failing to correctly identify them would greatly affect the performance of your classifier:

  • Continuous values : floating point values like price, weight, time
  • Categorical : one value out of a predefined set of values like boolean true/false or item ID
  • Word-like : just like a categorical value, but with an open-ended set of possible values
  • Text-like : a sequence of word-like values, all of the same type like text or a list of emails

In addition to correctly identify the type of features for your dataset, you will also need to take care not to fall into the “target leak” pit. Target leaking occurs when a feature in the training set provides information about the target (output category) in a way that won’t happen in production – just like handling the exam answers along with the exam. One example of such a leak would be to use the training example generated ID or the training filename (considering that production data won’t have those).

In order to illustrate the classification capabilities of Mahout, we’ll go through a simple example of classifying email messages in a predefined set of topics from a discussion group. The dataset ( is quite a common one among classification examples and it contains 20 topics or categories. This topic will be out target for the classifier and what we would like to do is, after training the classification model with some sample messages belonging to the twenty categories, provide as input an email message and get as output the topic it should be assigned to.

The dataset we are using is already well partitioned into training and testing sets with messages belonging to each topic so we can go ahead and start training our model.

We’ve chosen Logistic Regression as the classification algorithm and that can be instantiated like this:

int FEATURES = 10000;        
OnlineLogisticRegression learningAlgorithm = new OnlineLogisticRegression(20, FEATURES, new L1())

Next, we’ll need to read the training set. The 20news dataset is already sanitized and the training email messages nicely provided in a separate folder (20news-bydate-train) containing subfolders like talk.politics.mideast, talk.politics.misc or talk.religion.misc which at their turn contain the actual email messages as individual files with a random id as filename. As a note, using this file name id as a feature in the classification model would be a possible target leak. From the email’s attributes worth keeping are the subject, from, number of lines, keywords and actual email body.

File base = new File("classification/input-data/20news-bydate-train");
        List<File> files = new ArrayList<File>();
        Dictionary newsGroups = new Dictionary();
        for (File newsgroup : base.listFiles()) {

Having this input, we need to vectorize each email message, feed it to the classification model and also check the progress along the way. First we’ll need some feature encoders, provided by Mahout:

        Map<String, Set<Integer>> traceDictionary = new TreeMap<String, Set<Integer>>();
        FeatureVectorEncoder encoder = new StaticWordValueEncoder("body");
        FeatureVectorEncoder bias = new ConstantValueEncoder("Intercept");
        FeatureVectorEncoder lines = new ConstantValueEncoder("Lines");
        Analyzer analyzer = new StandardAnalyzer(Version.LUCENE_46);

Then, for each input file (from collection files) encode it

String ng = file.getParentFile().getName();
            int actual = newsGroups.intern(ng);
            BufferedReader reader = new BufferedReader(new FileReader(file));
            Multiset<String> words = ConcurrentHashMultiset.create();
            double lineCount = 0;
            String line = reader.readLine();
            while (line != null && line.length() > 0) {
                if (line.startsWith("Lines:")) {
                    String count = Iterables.get(Arrays.asList(line.split(":")), 1);
                    try {
                        count = count != null ? count.trim() : count;
                        lineCount = Integer.parseInt(count);
                        averageLineCount += (lineCount - averageLineCount) / Math.min(k + 1, 1000);
                    } catch (NumberFormatException e) {
                        lineCount = averageLineCount;
                boolean countHeader = (line.startsWith("From:") || line.startsWith("Subject:") ||
                        line.startsWith("Keywords:") || line.startsWith("Summary:"));
                do {
                    StringReader in = new StringReader(line);
                    if (countHeader) {
                        countWords(analyzer, words, in);
                    line = reader.readLine();
                } while (line.startsWith(" "));
            countWords(analyzer, words, reader);
            RandomAccessSparseVector vector = new RandomAccessSparseVector(FEATURES);
            bias.addToVector((String) null, 1, vector);
            lines.addToVector((String) null, lineCount / 30, vector);
            lines.addToVector((String) null, Math.log(lineCount + 1), vector);
            for (String word : words.elementSet()) {
                encoder.addToVector(word, Math.log(1 + words.count(word)), vector);

and then feed it to the classification model

            double mu = Math.min(k + 1, 200);
            double ll = learningAlgorithm.logLikelihood(actual, vector);
            averageLL = averageLL + (ll - averageLL) / mu;
            org.apache.mahout.math.Vector p = new DenseVector(20);
            learningAlgorithm.classifyFull(p, vector);
            int estimated = p.maxValueIndex();
            int correct = (estimated == actual ? 1 : 0);
            averageCorrect = averageCorrect + (correct - averageCorrect) / mu;
            learningAlgorithm.train(actual, vector);
            int bump = bumps[(int) Math.floor(step) % bumps.length];
            int scale = (int) Math.pow(10, Math.floor(step / bumps.length));
            if (k % (bump * scale) == 0) {
                step += 0.25;
                System.out.printf("%10d %10.3f %10.3f %10.2f %s %s\n",
                        k, ll, averageLL, averageCorrect * 100, ng,

After all the input files made it, we’ll need to close the learning process and write the created model to a file. This model can then be loaded up by another process and used in order to classify email messages to one of the twenty categories.

        File algo = new File("classification/algo");
        DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(new File(algo, "algo.dat")));
        ObjectOutputStream stream = new ObjectOutputStream(new FileOutputStream(new File(algo, "dict.dat")));
        for (String category : newsGroups.values()) {

The amount of code is definitely bigger than for the previous two samples (recommender and clustering) and this is only half of it (although loading and using the model is quite similar to this) but the results worth it. It’s quite remarkable that with the help of Mahout, by writing a couple of lines of code, you have at your disposal a trained classification engine with an accuracy as high as 80% (research proving the most accurate one at about 86%).

In order to use the trained model what you need to do is:

        OnlineLogisticRegression algo = new OnlineLogisticRegression();
        algo.readFields(new DataInputStream(new FileInputStream(algoFile)));
        ObjectInputStream inDict = new ObjectInputStream(new FileInputStream(dictFile));
        Dictionary newsGroups = new Dictionary();
        for (int i = 0; i < algo.numCategories(); i++) {
            newsGroups.intern((String) inDict.readObject());

then vectorize the input message just as for the training part and send it as input to the classification method

org.apache.mahout.math.Vector p = new DenseVector(20);
            	algo.classifyFull(p, vector);
            	int resultClass = p.maxValueIndex();
	System.out.println("File " + testFile + " was classified as " + newsGroups.values().get(resultClass));

Setup Mahout

Just add the following dependency in your pom (for Maven users)


and you are set to use Mahout in stand-alone mode.

In order to use it on top of Hadoop, first you’ll need to set up a Hadoop cluster (for development and experimenting a pseudo distributed single node machine will do) and then you can either write your own map-reduce jobs that will use the existing Mahout algorithms or use directly the provided distributed implementations from Mahout.

However, you’ll need to check what version of Hadoop were the provided mahout tools and job compiled against. For example, support for Hadoop 2 was only recently added –

The mentioned provided MapReduce jobs can be found in the Mahout distribution in the core-job jar.

ex: for version 0.9

– download the distribution from   (ex:

– unzip in a local folder (ex: ~/tools/mahout-distribution-0.9). In the extracted folder you will find the core job jar (ex: ~/tools/mahout-distribution-0.9/mahout-core-0.9-job.jar )

Setup Hadoop single-node on local machine

Please note that some of the configuration instructions specific to Hadoop paths and config files might differ depending on the actual version of Hadoop used.

1. make sure you have Java >= 1.6  (type java -version on a console in order to check)

2. make sure you have enabled self-login

a. if you don’t already have a public key (.ssh/, generate one:

ssh-keygen -t rsa -P ""

b. add your public key to the authorized list

cat ~/.ssh/ >> ~/.ssh/authorized_keys

c. try it out

ssh localhost

3. Download the desired Hadoop version from

4. Extract the content in a local folder

ex: ~/tools/hadoop

5. Update the Hadoop configuration

a. ( ~/tools/hadoop/conf ) with the Java home path

update export JAVA_HOME=/Library/Java/Home

b. core-site.xml ( ~/tools/hadoop/conf/core-site.xml) with

  <description>A base for other temporary directories.</description>

  <description>The name of the default file system.  A URI whose
  scheme and authority determine the FileSystem implementation.  The
  uri's scheme determines the config property (fs.SCHEME.impl) naming
  the FileSystem implementation class.  The uri's authority is used to
  determine the host, port, etc. for a filesystem.</description>

b. hdfs-site.xml with

<description>Default block replication.
  The actual number of replications can be specified when the file is created.
  The default is used if replication is not specified in create time.


c. mapred-site.xml (copy from mapred-site.xml.template) with

<description>The host and port that the MapReduce job tracker runs
  at. If "local", then jobs are run in-process as a single map
  and reduce task.


<description>The maximum number of tasks that will be run simultaneously by a
a task tracker


6. start everything

a. Format the namenode:

~/tools/hadoop/bin/hadoop namenode -format

b. start


c. git it a try at http://localhost:50070/dfshealth.jsp

Running a Mahout job from command line

Copy the links input file:

1. Download the file:


2. Create an input folder on hdfs

~/tools/hadoop/bin/hadoop fs -mkdir /input

check it out at http://localhost:50075/browseDirectory.jsp?namenodeInfoPort=50070&dir=/&nnaddr=

3. copy the links input file

~/tools/hadoop/bin/hadoop fs -put links-simple-sorted.txt /input/input.txt

4. create the users.txt file (one line, containing the nr 3) and upload it to hdfs

~/tools/hadoop/bin/hadoop fs -put users.txt /input/users.txt

5. run the Mahout recommender job