1 00:00:00,000 --> 00:00:07,005 Let's get started with some examples using Octopython, which is our lightweight 2 00:00:07,005 --> 00:00:14,019 MapReduce implementation that we shall be using for our programming assignments 3 00:00:14,019 --> 00:00:19,001 later this week. Let's take a look at how a word count 4 00:00:19,001 --> 00:00:26,025 problem can be expressed in Octopython. This is a slight modification of the file 5 00:00:26,025 --> 00:00:30,053 which you'll find on the site as a demo example. 6 00:00:30,053 --> 00:00:37,066 The directory Gutenburg Small contains a bunch of small text files much smaller 7 00:00:37,066 --> 00:00:45,005 than the ones which are available on the site so that we can actually execute this 8 00:00:45,005 --> 00:00:49,068 program in a few seconds rather than a few hours. 9 00:00:49,068 --> 00:00:55,029 Notice that we have defined two functions, the map and the reduce. 10 00:00:55,029 --> 00:01:01,071 The map function takes a key value pair and emits other key value pairs. 11 00:01:01,071 --> 00:01:07,065 And so does the reduce. Let's see what the map function is doing. 12 00:01:07,065 --> 00:01:15,007 The key, in this case, is the document ID, or the document name, the file name here. 13 00:01:15,007 --> 00:01:19,006 And the value is the contents of that file. 14 00:01:19,006 --> 00:01:24,064 For every line in that file, and for every word in that line. 15 00:01:25,023 --> 00:01:32,018 The map function emits a pair which is the word itself in lower case and the count of 16 00:01:32,018 --> 00:01:35,073 one. The reduce function gets a bunch of key 17 00:01:35,073 --> 00:01:40,057 value pairs as well. In this case, the keys are the words and 18 00:01:40,057 --> 00:01:46,079 the values are lists of counts that it's getting from many different mappers. 19 00:01:46,079 --> 00:01:53,009 The reduce function also gets a bunch of key value pairs from all the mappers. 20 00:01:53,009 --> 00:01:59,436 In this case, the keys are words, and the values are the counts which are just the 21 00:01:59,436 --> 00:02:04,942 ones emitted by the mappers. And so, for every word, all the reduce 22 00:02:04,942 --> 00:02:11,466 function needs to do is figure out how many mappers found that word how many 23 00:02:11,466 --> 00:02:15,462 times. And all it needs to do is figure out the 24 00:02:15,462 --> 00:02:22,289 length of the list of values it gets from all the mappers for a particular word. 25 00:02:22,289 --> 00:02:27,242 Notice that the map functions are emitting counts of one. 26 00:02:27,242 --> 00:02:34,638 So it's okay for the reducer to merely sum up the lists of values that it gets from 27 00:02:34,638 --> 00:02:40,526 all the mappers. We'll now run this MapReduce program using 28 00:02:40,526 --> 00:02:43,461 Octo. This terminal is the server. 29 00:02:43,461 --> 00:02:50,327 So when we start it by simply calling the Octo code with the key word, server. 30 00:02:50,327 --> 00:02:56,714 And the program that implements the map reduced functions which is our program 31 00:02:56,714 --> 00:03:02,234 here. A server gets started waiting for clients 32 00:03:02,234 --> 00:03:06,353 to join. We have two more shells where we will 33 00:03:06,353 --> 00:03:13,336 start clients by merely calling Okto with the client keyword, and telling it which 34 00:03:13,336 --> 00:03:18,325 server to connect to. That is which machine the server is 35 00:03:18,325 --> 00:03:22,580 running on. The server in our case is running on the, 36 00:03:22,580 --> 00:03:28,559 very machine itself, so we use localhost, as the address, for the server. 37 00:03:28,559 --> 00:03:35,166 Let's start each of the appliance and as you can see, they are performing different 38 00:03:35,166 --> 00:03:41,258 map tasks on different files and then finally they cooperate to conform all the 39 00:03:41,258 --> 00:03:47,155 reduce tasks, in the end writing the totals to a file as well as to the screen. 40 00:03:47,155 --> 00:03:53,051 So, for example they have computed that across all these files, there are four 41 00:03:53,051 --> 00:03:58,978 occurrences of you, two occurrences of yet, and many others which are scrolled up 42 00:03:58,978 --> 00:04:03,773 beyond our view. The rest of code is really used by the 43 00:04:03,773 --> 00:04:09,592 server to set things up. In particular a source dictionary needs to 44 00:04:09,592 --> 00:04:16,208 get created with the key value pairs that each mapper needs to get are kept in a 45 00:04:16,208 --> 00:04:20,961 dictionary. So here we have the file name as the key 46 00:04:20,961 --> 00:04:27,657 and the file contents as the value for all the files in the directory Gutenberg 47 00:04:27,657 --> 00:04:33,718 Small. Notice also that both clients and servers 48 00:04:33,718 --> 00:04:40,031 get the same program. It's just that the clients implement the 49 00:04:40,031 --> 00:04:47,373 map and reduce operations and the server merely gets data from where it needs to 50 00:04:47,373 --> 00:04:53,093 find it and sends it out to the mappers and finally accumulates the data from the 51 00:04:53,093 --> 00:04:59,396 reducers and prints out the results. When you run this code for yourselves some 52 00:04:59,396 --> 00:05:05,374 of you will notice that the mappers or rather the processors in the map phase are 53 00:05:05,374 --> 00:05:11,583 only implementing mapper operations and emitting counts of one, rather than, as we 54 00:05:11,583 --> 00:05:18,507 have described earlier performing partial reduced operations from whatever documents 55 00:05:18,507 --> 00:05:23,772 that they manage to get data for. Lets see how much data is actually 56 00:05:23,772 --> 00:05:28,331 produced by the map phase because of this behavior. 57 00:05:28,331 --> 00:05:34,637 Each word is emitted by a mapper multiple times. 58 00:05:34,637 --> 00:05:42,949 In fact if the size of the initial set of documents is d, including every occurrence 59 00:05:42,949 --> 00:05:50,142 of every word, after processing by mappers, again, exactly that much data is 60 00:05:50,142 --> 00:05:56,027 produced, because each word is emitted every time it is seen. 61 00:05:56,027 --> 00:06:02,712 On the other hand, in our earlier implementation or description of the word 62 00:06:02,712 --> 00:06:10,064 counting problem this was not the case. Since the counts within each map process 63 00:06:10,064 --> 00:06:16,095 were actually summed up before moving on to the reduced phase. 64 00:06:16,095 --> 00:06:23,833 This process of reduce operations being performed partially within each mapper is 65 00:06:23,833 --> 00:06:29,517 called a combined phase. In other words, when you sum up the word 66 00:06:29,517 --> 00:06:36,669 counts for every mapper before emitting, you're implementing the reduce function as 67 00:06:36,669 --> 00:06:42,655 a combined function before the data get sorted by the MapReduce system. 68 00:06:42,655 --> 00:06:49,141 The result of this is a reduction in the amount of intermediate data which is 69 00:06:49,141 --> 00:06:54,946 produced which will lead to better parallel efficiency of the map produced 70 00:06:54,946 --> 00:07:00,656 process as we'll see shortly. See if we can modify our Octo program to 71 00:07:00,656 --> 00:07:07,719 implement a combined phase where partial reduce operations are performed before 72 00:07:07,719 --> 00:07:11,723 passing the data onto the actual reduce phase. 73 00:07:11,723 --> 00:07:18,344 Unfortunately Octo doesn't allow for a separate combine operation like the map 74 00:07:18,344 --> 00:07:23,364 and reduce functions. However, we can simulate the combined 75 00:07:23,364 --> 00:07:30,025 process by ensuring that each time a math function is called. 76 00:07:30,025 --> 00:07:38,024 It keeps track of its results in a local variable W. 77 00:07:39,016 --> 00:07:47,053 The reason this is possible, is because of a peculiar feature in Python, called the 78 00:07:47,053 --> 00:07:52,085 iterator. Where a function that instead of returning 79 00:07:52,085 --> 00:07:58,062 a value, yields a value. Stays in memory and all it's variable 80 00:07:58,062 --> 00:08:05,085 defined remain available next time the function is called with exactly the same 81 00:08:05,085 --> 00:08:10,045 set of inputs. And the function begins implementing 82 00:08:10,045 --> 00:08:16,005 exactly where it left off last time. So let's see how it works. 83 00:08:16,005 --> 00:08:22,036 Every time the map function is called for a particular document, it adds up the word 84 00:08:22,036 --> 00:08:28,030 counts for all the words that it sees in that document, and stores them in this 85 00:08:28,030 --> 00:08:32,529 dictionary W. Then it goes thru all the words that its 86 00:08:32,529 --> 00:08:38,688 found in this dictionary with counts, and yields them as key value pairs. 87 00:08:38,688 --> 00:08:45,179 When the yield operation is called, the function actually returns this value to 88 00:08:45,179 --> 00:08:51,909 the caller, which is the Octo program. The next time the Octo program asks for a 89 00:08:51,909 --> 00:08:56,810 value, it yields the next count, and so on and so forth. 90 00:08:56,810 --> 00:09:03,162 In a sense, this is implementing the combiner within the map function. 91 00:09:03,162 --> 00:09:10,153 A little bit of cheating, because in real map reduce implementations we actually 92 00:09:10,153 --> 00:09:16,159 have a separate combine function, in addition to the map and the reduce 93 00:09:16,159 --> 00:09:20,169 functions. >> Let's now run our modified combiner 94 00:09:20,169 --> 00:09:26,211 enabled map reduce task. We start the server and then we start each 95 00:09:26,211 --> 00:09:32,115 client in turn as you can see they are performing the map and then the reduced 96 00:09:32,115 --> 00:09:37,532 phase and now we are done. You might not have noticed earlier, but 97 00:09:37,532 --> 00:09:44,634 the server is actually printing out the time, the combined MapReduce task actually 98 00:09:44,634 --> 00:09:48,183 took. In this case, it was twelve seconds. 99 00:09:48,183 --> 00:09:54,589 The actual Octo program available from the site doesn't have this timing. 100 00:09:54,589 --> 00:10:01,087 So I would encourage you to try to look through that code and insert the timing, 101 00:10:01,087 --> 00:10:10,216 so that you can reproduce such results. Further, do look back and figure out how 102 00:10:10,216 --> 00:10:15,128 much time the. Word count program without the combiner 103 00:10:15,128 --> 00:10:21,211 actually took and you'll see that the combiner phase actually added a lot of 104 00:10:21,211 --> 00:10:25,712 efficiency. Last but not least it's very important to 105 00:10:25,712 --> 00:10:32,345 realize that the combined function can work only if the reduce function is a 106 00:10:32,345 --> 00:10:38,874 commutative and associative one because we don't know which order these reduce 107 00:10:38,874 --> 00:10:42,548 functions are being performed in on what data. 108 00:10:42,548 --> 00:10:50,185 So, any reduce function which relies on the fact that the function is applied in a 109 00:10:50,185 --> 00:10:57,715 particular order on all the values for a particular key will not be able to be 110 00:10:57,715 --> 00:11:03,050 split up into a combined phase for efficiency purposes.