Thursday, June 21, 2012

Parallel Processing of File Data, Iterator groups and Sequences FTW!

I have occasion to need to process very large files here and there.  It seems that Scala is very good at this in general.  There is a nice feature in the BufferedSource class that allows you to break up file parsing or processing into chunks so that parallelization can be achieved.

If you've tried the obvious solution, simply adding .par, the method isn't present.  So, you might convert to a List with toList.  When you convert like this, Scala will then compile all the lines into a List in memory before passing it on.  If you have a large file, you'll quickly run out of memory and your process will crash with an OutOfMemoryException.

BufferedSource offers us another way to do this with the grouped() method call.  You can pass a group size into the method call to break your stream into a sequence of lists.  So, instead of just a String sequence made up of millions of entries, one for each line, you get an set of Iterators made up of Sequences with 10,000 lines in each.  A BufferedSource is a kind of Iterator, and any kind of Iterator can be grouped in this way, Sequences or Lists included.  Now you have a Sequence type with a finite element count which you can parallelize the processing on and increase throughput, and flatMap the results back together at the end.

The code looks something like this:

io.Source.stdin.getLines().grouped(10000).flatMap { y=>
      y.par.map({x: String =>
        LogParser.parseItem(x)
      })}.flatMap(x=>x).foreach({ x: LogRecord =>
         println(x.toString)
      })

So with this, we can read lines from stdin as a buffered source, and also parallelize without the need to hold the entire dataset in memory!

At the moment, there is no easy way to force Scala to increase the parallelization level beyond your CPU core count that I could get to work.  This kind of I/O splitting wasn't what the parallelization operations had in mind as far as I know, it's more a job for Akka or similar.  Fortunately, in Scala 2.10, we'll get Promises and Futures which will make this kind of thing much more powerful and give us more easy knobs and dials to turn on the concurrency configuration.  Hopefully I'll post on that when it happens!

No comments:

Post a Comment