Let's get started with some examples using Octopython, which is our lightweight MapReduce implementation that we shall be using for our programming assignments later this week. Let's take a look at how a word count problem can be expressed in Octopython. This is a slight modification of the file which you'll find on the site as a demo example. The directory Gutenburg Small contains a bunch of small text files much smaller than the ones which are available on the site so that we can actually execute this program in a few seconds rather than a few hours. Notice that we have defined two functions, the map and the reduce. The map function takes a key value pair and emits other key value pairs. And so does the reduce. Let's see what the map function is doing. The key, in this case, is the document ID, or the document name, the file name here. And the value is the contents of that file. For every line in that file, and for every word in that line. The map function emits a pair which is the word itself in lower case and the count of one. The reduce function gets a bunch of key value pairs as well. In this case, the keys are the words and the values are lists of counts that it's getting from many different mappers. The reduce function also gets a bunch of key value pairs from all the mappers. In this case, the keys are words, and the values are the counts which are just the ones emitted by the mappers. And so, for every word, all the reduce function needs to do is figure out how many mappers found that word how many times. And all it needs to do is figure out the length of the list of values it gets from all the mappers for a particular word. Notice that the map functions are emitting counts of one. So it's okay for the reducer to merely sum up the lists of values that it gets from all the mappers. We'll now run this MapReduce program using Octo. This terminal is the server. So when we start it by simply calling the Octo code with the key word, server. And the program that implements the map reduced functions which is our program here. A server gets started waiting for clients to join. We have two more shells where we will start clients by merely calling Okto with the client keyword, and telling it which server to connect to. That is which machine the server is running on. The server in our case is running on the, very machine itself, so we use localhost, as the address, for the server. Let's start each of the appliance and as you can see, they are performing different map tasks on different files and then finally they cooperate to conform all the reduce tasks, in the end writing the totals to a file as well as to the screen. So, for example they have computed that across all these files, there are four occurrences of you, two occurrences of yet, and many others which are scrolled up beyond our view. The rest of code is really used by the server to set things up. In particular a source dictionary needs to get created with the key value pairs that each mapper needs to get are kept in a dictionary. So here we have the file name as the key and the file contents as the value for all the files in the directory Gutenberg Small. Notice also that both clients and servers get the same program. It's just that the clients implement the map and reduce operations and the server merely gets data from where it needs to find it and sends it out to the mappers and finally accumulates the data from the reducers and prints out the results. When you run this code for yourselves some of you will notice that the mappers or rather the processors in the map phase are only implementing mapper operations and emitting counts of one, rather than, as we have described earlier performing partial reduced operations from whatever documents that they manage to get data for. Lets see how much data is actually produced by the map phase because of this behavior. Each word is emitted by a mapper multiple times. In fact if the size of the initial set of documents is d, including every occurrence of every word, after processing by mappers, again, exactly that much data is produced, because each word is emitted every time it is seen. On the other hand, in our earlier implementation or description of the word counting problem this was not the case. Since the counts within each map process were actually summed up before moving on to the reduced phase. This process of reduce operations being performed partially within each mapper is called a combined phase. In other words, when you sum up the word counts for every mapper before emitting, you're implementing the reduce function as a combined function before the data get sorted by the MapReduce system. The result of this is a reduction in the amount of intermediate data which is produced which will lead to better parallel efficiency of the map produced process as we'll see shortly. See if we can modify our Octo program to implement a combined phase where partial reduce operations are performed before passing the data onto the actual reduce phase. Unfortunately Octo doesn't allow for a separate combine operation like the map and reduce functions. However, we can simulate the combined process by ensuring that each time a math function is called. It keeps track of its results in a local variable W. The reason this is possible, is because of a peculiar feature in Python, called the iterator. Where a function that instead of returning a value, yields a value. Stays in memory and all it's variable defined remain available next time the function is called with exactly the same set of inputs. And the function begins implementing exactly where it left off last time. So let's see how it works. Every time the map function is called for a particular document, it adds up the word counts for all the words that it sees in that document, and stores them in this dictionary W. Then it goes thru all the words that its found in this dictionary with counts, and yields them as key value pairs. When the yield operation is called, the function actually returns this value to the caller, which is the Octo program. The next time the Octo program asks for a value, it yields the next count, and so on and so forth. In a sense, this is implementing the combiner within the map function. A little bit of cheating, because in real map reduce implementations we actually have a separate combine function, in addition to the map and the reduce functions. >> Let's now run our modified combiner enabled map reduce task. We start the server and then we start each client in turn as you can see they are performing the map and then the reduced phase and now we are done. You might not have noticed earlier, but the server is actually printing out the time, the combined MapReduce task actually took. In this case, it was twelve seconds. The actual Octo program available from the site doesn't have this timing. So I would encourage you to try to look through that code and insert the timing, so that you can reproduce such results. Further, do look back and figure out how much time the. Word count program without the combiner actually took and you'll see that the combiner phase actually added a lot of efficiency. Last but not least it's very important to realize that the combined function can work only if the reduce function is a commutative and associative one because we don't know which order these reduce functions are being performed in on what data. So, any reduce function which relies on the fact that the function is applied in a particular order on all the values for a particular key will not be able to be split up into a combined phase for efficiency purposes.