1 00:00:00,481 --> 00:00:04,719 [MUSIC] 2 00:00:04,719 --> 00:00:07,387 We're going to use k-means as an opportunity to present 3 00:00:07,387 --> 00:00:10,403 a really powerful and broadly used framework for parallel and 4 00:00:10,403 --> 00:00:13,090 distributed implementations of algorithms. 5 00:00:13,090 --> 00:00:14,800 And this framework is called MapReduce. 6 00:00:15,930 --> 00:00:18,940 We'll start by describing MapReduce in the context of a really simple 7 00:00:18,940 --> 00:00:20,540 word count in example. 8 00:00:20,540 --> 00:00:24,000 Which is the Hello World example of MapReduce. 9 00:00:24,000 --> 00:00:26,920 And then we're going to get to how to apply the MapReduce framework 10 00:00:26,920 --> 00:00:29,080 to parallelize k-means. 11 00:00:29,080 --> 00:00:33,890 Okay, but for now let's imagine that we have 10 billion documents, and 12 00:00:33,890 --> 00:00:35,020 just a single machine, 13 00:00:35,020 --> 00:00:40,550 and we want to count the number of occurrences, of every word in the corpus. 14 00:00:40,550 --> 00:00:42,530 So how are we going to do this? 15 00:00:42,530 --> 00:00:43,617 Well, to begin with, 16 00:00:43,617 --> 00:00:46,437 we're going to initialize a hash table which we call count. 17 00:00:50,441 --> 00:00:57,820 Then, for every document d, in our corpus which we're call the set of the documents. 18 00:00:59,390 --> 00:01:05,630 And for every word in document d, 19 00:01:06,890 --> 00:01:16,000 we're simply going to increment our count of this word. 20 00:01:17,100 --> 00:01:21,940 But of course cycling through 10 billion documents and 21 00:01:21,940 --> 00:01:25,510 every word in each one of those documents and then incrementing our account for 22 00:01:25,510 --> 00:01:28,910 each one of this words in maintaining those hush table is going to be a really, 23 00:01:28,910 --> 00:01:30,210 really intensive task. 24 00:01:31,920 --> 00:01:33,890 So instead of question is wow, 25 00:01:33,890 --> 00:01:36,190 what if I had a thousand machines to available to me. 26 00:01:37,310 --> 00:01:38,930 Can I somehow parallelize or 27 00:01:38,930 --> 00:01:41,800 distribute this operation across these different machines. 28 00:01:43,180 --> 00:01:45,790 So, in particular let's imagine that we 29 00:01:45,790 --> 00:01:47,860 are going to just evenly distribute our data. 30 00:01:50,450 --> 00:01:53,510 So, we're going to send 10 million documents to each one of our 31 00:01:53,510 --> 00:01:54,449 thousand machines. 32 00:01:56,020 --> 00:02:00,230 And then what we can think about doing is we can think about counting words 33 00:02:00,230 --> 00:02:02,260 in the documents per machine, 34 00:02:02,260 --> 00:02:07,090 separately, because what we're thinking about doing word counts. 35 00:02:07,090 --> 00:02:11,230 This is an operation that's independent across documents. 36 00:02:11,230 --> 00:02:14,920 The word counts in another document don't influence the word counts in 37 00:02:14,920 --> 00:02:16,840 some separate document. 38 00:02:16,840 --> 00:02:21,340 So this is an example of what's called a data parallel task. 39 00:02:21,340 --> 00:02:24,010 So, some task that 40 00:02:24,010 --> 00:02:28,150 can be done completely independently across the different data points. 41 00:02:28,150 --> 00:02:32,670 So, what this implies is we can think about just counting the occurrences 42 00:02:32,670 --> 00:02:36,940 of words separately in each one of our documents. 43 00:02:36,940 --> 00:02:38,855 And then merging these together. 44 00:02:38,855 --> 00:02:44,611 Okay, so we can do these counts per machine and then simply. 45 00:02:47,906 --> 00:02:49,710 Add these counts together. 46 00:02:52,560 --> 00:02:54,900 So to be clear, per machine, 47 00:02:54,900 --> 00:02:58,780 we run exactly the code that we had on the previous slide, but 48 00:02:58,780 --> 00:03:02,950 now it's only over 10 million documents, not 10 billion documents. 49 00:03:02,950 --> 00:03:07,474 But then, once we have these hash tables, 50 00:03:07,474 --> 00:03:11,755 we're going to combine them together. 51 00:03:14,758 --> 00:03:19,792 So we have that the count, the total count the entire corpus is of a given word is 52 00:03:19,792 --> 00:03:25,500 summing over each of one of our thousand machines the local count of that word. 53 00:03:25,500 --> 00:03:28,380 Okay but here we've shown this just for a single word. 54 00:03:28,380 --> 00:03:29,695 And remember we want to do this for 55 00:03:29,695 --> 00:03:33,890 every word in our vocabulary that is present in the corpus. 56 00:03:33,890 --> 00:03:36,590 So how do we do this? 57 00:03:36,590 --> 00:03:39,420 Well it seems like we're now back to a sequential problem. 58 00:03:39,420 --> 00:03:42,060 We have to go through each one of our words and 59 00:03:42,060 --> 00:03:44,530 do this merge of these hash tables. 60 00:03:44,530 --> 00:03:50,394 So just to make sure that's clear, we have to cycle 61 00:03:50,394 --> 00:03:57,680 through all words, 62 00:03:57,680 --> 00:04:03,730 in our vocabulary, and I'm not happy about doing that. 63 00:04:05,550 --> 00:04:07,040 So can we do something even better? 64 00:04:08,510 --> 00:04:12,840 Well, let's imagine that we have a whole bunch of machines available for 65 00:04:12,840 --> 00:04:16,890 this word counting and a whole bunch of words available when we're going to 66 00:04:16,890 --> 00:04:21,320 merge these hash tables and form our counts of all words in this vocabulary. 67 00:04:22,570 --> 00:04:26,900 Okay, so, what we can do is we can divide this into two different phases. 68 00:04:26,900 --> 00:04:33,140 One phase where we have an operation that's parallel over documents. 69 00:04:33,140 --> 00:04:38,200 Which is counting the words that appear in each one of these documents. 70 00:04:38,200 --> 00:04:43,363 So for example, maybe this machine it's going to produce. 71 00:04:46,163 --> 00:04:53,512 Five counts of the word, Ten counts of the word machine, 72 00:04:53,512 --> 00:05:00,980 and- Seven counts of the word learning. 73 00:05:00,980 --> 00:05:06,340 And maybe this machine also has some counts of the word learning. 74 00:05:07,460 --> 00:05:10,580 Maybe here we have three counts as well as 75 00:05:10,580 --> 00:05:15,280 counts of other words that appeared in the documents on this machine, so 76 00:05:15,280 --> 00:05:20,390 that's the step that we described in our previous life. 77 00:05:20,390 --> 00:05:25,440 But now what we're going to do is we're going to utilize multiple machines 78 00:05:25,440 --> 00:05:29,380 when we go to do the counts across the entire corpus. 79 00:05:29,380 --> 00:05:34,560 So in particular when we're going to merge these different counts that we've made. 80 00:05:34,560 --> 00:05:39,020 And to do this, this is the really key step is we're going to send 81 00:05:39,020 --> 00:05:43,150 all counts of a given word to a single machine. 82 00:05:43,150 --> 00:05:47,920 So all counts of the word learning that appear in 83 00:05:47,920 --> 00:05:52,608 any one of these thousand machines, are all going to get sent to one machine. 84 00:05:52,608 --> 00:05:57,640 Let's say, learning all goes to this 85 00:05:57,640 --> 00:06:02,610 machine here, and so this is going to then sum these counts. 86 00:06:02,610 --> 00:06:08,290 And in this case, maybe we would output learning. 87 00:06:08,290 --> 00:06:10,300 And if these were the only two instances, 88 00:06:10,300 --> 00:06:13,450 it would output learning with a count of 10. 89 00:06:13,450 --> 00:06:16,860 And the important thing about being able to do this second phase in this 90 00:06:16,860 --> 00:06:21,790 distributed manner is the fact that the operation that we're doing when we're 91 00:06:21,790 --> 00:06:26,760 merging these different hash tables is, again, 92 00:06:26,760 --> 00:06:32,130 an operation that can be implemented independently over now words. 93 00:06:32,130 --> 00:06:37,130 So this is an operation that will call, data parallel over words and that's 94 00:06:37,130 --> 00:06:41,330 what allows us to have this efficient structure from merging the hash tables. 95 00:06:42,780 --> 00:06:46,420 Okay, but the critical component here is making sure that we sent 96 00:06:46,420 --> 00:06:49,370 all counts of a given word to a single machine. 97 00:06:49,370 --> 00:06:54,770 So that whatever this aggregation over these counts that's produced on this, 98 00:06:54,770 --> 00:06:55,870 this machine here. 99 00:06:57,080 --> 00:07:01,100 Corresponds to all the counts of that word in the entire corpus. 100 00:07:02,100 --> 00:07:04,850 And how are we going to do this? 101 00:07:04,850 --> 00:07:08,009 Well, what we're going to do is use something that's called a hash function. 102 00:07:09,090 --> 00:07:11,590 And let's describe this in a little bit more detail. 103 00:07:12,620 --> 00:07:20,884 So which words go to machine i? 104 00:07:20,884 --> 00:07:24,650 Well, our hash function 105 00:07:24,650 --> 00:07:28,680 h is going to be something that's defined over our entire vocabulary. 106 00:07:31,080 --> 00:07:34,900 So over the indices, however many words we have in the vocabulary. 107 00:07:36,310 --> 00:07:40,900 That's going to be the input to the function, and the output is going to be in 108 00:07:40,900 --> 00:07:46,690 the range of 1, 2, all the way to the number of machines that we have available. 109 00:07:49,600 --> 00:07:57,070 And so in particular we'll send counts of the word learning, 110 00:08:03,535 --> 00:08:09,013 To machine h of 111 00:08:09,013 --> 00:08:13,580 Learning. 112 00:08:13,580 --> 00:08:17,910 Okay, so, to step back, what we've seen is that we had an operation that we could 113 00:08:17,910 --> 00:08:22,410 distribute across documents which was simply to count how many words appeared, 114 00:08:22,410 --> 00:08:26,390 how many instances of a given word appeared in that document. 115 00:08:26,390 --> 00:08:30,600 And then we had a step that could distribute across the unique words. 116 00:08:30,600 --> 00:08:34,690 Which was to add up all the counts that we had across these different documents 117 00:08:34,690 --> 00:08:39,100 to merge to get a total count of a given word in the corpus. 118 00:08:39,100 --> 00:08:42,942 And this is an example of something that falls into what's called the MapReduce 119 00:08:42,942 --> 00:08:43,696 abstraction. 120 00:08:43,696 --> 00:08:47,909 [MUSIC]