25 June 2012
Let’s say you have a tab separated file in HDFS, that contains only unique entities, one per line, and you would like to assign a unique ID to each, which also has to be monotonically increasing. So for N records, the IDs should be 0..N-1. Now, if the file is small, you will probably do something very similar to this:
hadoop fs -cat /some-file.tsv | cat -n | hadoop fs -put - /some-file-with-ids.tsv
(if it’s comma seperated or someting else, you’ll probably throw in some awk to fix that)
And we’re done! However, for big files, this is problematic, because all data has to go through one single process on one single machine. Due to the lack of parallelism in this case, the disk or network will be a bottleneck. In this post we’ll show a distributed way of doing this, implemented in Hadoop MapReduce.
We have some requirements:
So, we really need to have distributed ‘cat -n’. Who’d have though that be more involved than distributed ‘grep’? Anyway, here’s a solution:
So, the trick is to use the mapping phase to build up enough global knowledge about the data set to know exactly how may records there are and how they are partitioned across the reducers. The the framework is used as some sort of giant synchronization barrier and then each reducer will know exactly how many records preceeded its portion of the data.
Here is an illustrated / pseudo-coded example of the whole process. The job in the example has two mappers and three reducers. The partitioning is based on a fictional hash function that gives the results seen between parentheses. Each mapper keeps an array of counters for keeping track of how many records it sent to each reducer. Note that both the mapper and the partitioner need to know about the hashing function in order to work.
The input file is this:
ABC (hash == 0) DEF (hash == 1) GHI (hash == 2) JKL (hash == 0) MNO (hash == 1) ------ split ------ (above here goes to mapper 0, below to mapper 1) PQR (hash == 2) STU (hash == 0) VWX (hash == 1) YZ0 (hash == 1)
The job will run as follows:
Mapper 0: setup: initialize counters = [0, 0, 0] map: input "ABC" ==> emit "ABC", increment counters input "DEF" ==> emit "DEF", increment counters input "GHI" ==> emit "GHI", increment counters input "JKL" ==> emit "JKL", increment counters input "MNO" ==> emit "MNO", increment counters //now counters == [2, 2, 1] cleanup: emit counter for partition 0: none emit counter for partition 1: counters = 2 emit counter for partition 2: counters + counters = 4 Mapper 1: setup: initialize counters = [0, 0, 0] map: input "PQR" ==> emit "PQR", increment counters input "STU" ==> emit "STU", increment counters input "VWX" ==> emit "VWX", increment counters input "YZ0" ==> emit "YZ0", increment counters //now counters == [1, 2, 1] cleanup: emit counter for partition 0: none emit counter for partition 1: counters = 1 emit counter for partition 2: counters + counters = 3 ========================= START SORT / SHUFFLE / MERGE ========================= The framework sorts all key+value pairs based on the keys. We use a specialized comparator that will only make sure that the counters are sorted above the actual lines. Also, we use a specialized grouping comparator, that creates only one group per reducer, so each reduce method will be called only once, with the counters sorted on top and the all the records after that. Scroll down for the actual implementation of these. ========================= END SORT / SHUFFLE / MERGE =========================== Reducer 0: reduce: initialize offset = 0 input "ABC" ==> emit "offset <tab> ABC", increment offset //emits 0<tab>ABC input "JKL" ==> emit "offset <tab> JKL", increment offset //emits 1<tab>ABC, and so on... input "STU" ==> emit "offset <tab> STU", increment offset //last emitted offset is 2 from here Reducer 1: reduce: initialize offset = 0 input counter with value 2 ==> offset = offset + 2 input counter with value 1 ==> offset = offset + 1 //now offset == 3 input "DEF" ==> emit "offset <tab> DEF", increment offset //emits 3<tab>DEF input "MNO" ==> emit "offset <tab> MNO", increment offset //emits 4<tab>MNO, and so on... input "VWX" ==> emit "offset <tab> VWX", increment offset input "XY0" ==> emit "offset <tab> XY0", increment offset //last emitted offset is 6 from here Reducer 2: reduce: initialize offset = 0 input counter with value 4 ==> offset = offset + 4 input counter with value 3 ==> offset = offset + 3 //now offset == 7 input "GHI" ==> emit "offset <tab> GHI", increment offset //emits 7<tab>GHI input "PQR" ==> emit "offset <tab> PQR", increment offset //emits 8<tab>PQR
Resulting will be this:
0 ABC 1 JKL 2 STU 3 DEF 4 MNO 5 VWX 6 XY0 7 GHI 8 PQR
Note that the ordering of records was not preserved in the process. The job will re-order the records arbitrarily. If the result needs to be sorted in some way, then there’s the solutions of pre-sampling of the data and doing total order partitioning (just like the terasort example). In order for this to work, the mappers also would need to know about the partition boundaries that the total ordering partitioner uses.
There are two source files: RowNumberJob.java and RowNumberWritable.java. The former is the job (with mapper and reducer implementations) and the latter is the writable implementation used for intermediate values and the partitioner. Below is a brief comment on some of the moving parts.
The implementation of this job has three special parts (other than a mapper and reducer implementation):
In Hadoop, the partitioner can base its partitioning decision on both the key and the value at hand. We make convenient use of this fact, by partitioning on the value. If the value is a counter record, we look at the given partition (set in the mapper code). If the value is a actual record, we partition based on the hash of the record. The hashing logic is also export from the partitioner (as a static method), such that the mapper code can use the same logic to determine which counters to increment.
If you have been paying attention, you know that I rely on sorting to make sure that the counter records go on top. Now in order to make sure this happens, we don’t need to implement anything special. We just emit the counter records with a key that sorts before the key we use for value records. In this case, we use a standard ByteWritable, with the values ’T' for counter and ‘W’ for value (the Dutch will understand).
Since we emit the counter records only once from the mapper, we receive them only once on the reducer side. Each reducer will only just emit whatever it receives in terms of actual data records. Hence, there is no point in grouping things. Remember that we use the key to make the distinction between counters and actual records (we need to in order to sort the counters on top). If we didn’t create a custom grouping comparator, the counters and actual values would appear as two different groups in the reducer. In order to get everything into one group, we implement a comparator that always return 0 (meaning everything is equal).
We read all the data, write it back in a different order, push most of it through the network and then read and write it once more. This is a lot of trouble just to do line numbering. Is it really worth it? See these results:
Input size 1.45GB ==> cat wins
Pipe through cat: 0m23s MapReduce: 0m33s (50 mappers, 3 reducers)
Input size 145GB ==> MapReduce wins
Pipe through cat: 44m03s MapReduce: 7m36s (2633 mappers, 80 reducers)
This test was run on a 16 node cluster with 12 disks/node, 32GB RAM, 1Gb NICs. No fancy tuning was done apart from some basics. Input and output were uncompressed, intermediate data was compressed with the Snappy codec. The pipe through cat test was done from the master node of the cluster and was bound by using only a single disk at a time.blog comments powered by Disqus