Monday, July 9, 2012

Data processing, procedural, functional, parallelism and being Captain Obvious

I'm not going to tell you anything you don't already know in this post.  I might however manage to aggregate some things your already knew into one place and make them dance to a slightly new tune.

At the heart of this post is somewhat of an epiphany I had today.  It has to do with how code is written to do data processing.  This is a very common task for programmers, perhaps one that is in fact ubiquitous.

Ultimately data processing almost always looks something like this:


You load some stuff, parse it, transform it, filter it and output it.  Those things may happen in different orders, but ultimately, something like that.

One of the things you already know is that the implementation of this should look like a production line.  Read a datum in, send it through the processing process, rinse repeat, batch as need be for efficiency.

The amazing thing is that when you look at implementations, they often end up looking like this:


Code is written that loads the entire set into memory as a list of objects, which then pass through some methods who change that list of objects, either by transforming the objects themselves, or worse, copying the list in entirety to another list of different objects, filtering the list in situ, then saving the whole lot out.  These programs end up requiring some amount RAM at least as large as the data itself as a result.  Everybody knows this is a bad way to do things, so why do people keep writing code that looks like this?

We all know it should look more like



I think this is perhaps the tune to which many have not given thought to.  The problem just pops up, and people start scrambling to fix it, trying to dance triple time to the beat of the old drum.  I believe one significant cause maybe time and effort.  Data processing code often starts life as a humble script before it morphs into something bigger.  Most scripts are written in procedural languages.  In these environments, parallelization and streaming are more complicated to write than loading in the whole file and passing it around as an object, so people default to the latter.  Why write a mass of code dealing with thread pools and batching when you don't have to?  (I know there are libraries and frameworks, but often, people don't know them, or don't have enough time to use them).
This problem is easy to solve in a language where functions are first order values.  For each flow step, you define a function to perform that operation.  Not any different than procedural.  Instead of the function taking a value as an input and returning a new value, our functional variant instead returns the function which is the transformation, that being a function taking an object an returning one.  The flow can then be defined as a function that executes a list of transform functions, which can itself be a function that returns a function which takes an object and returns an object.  Now we can apply that flow to any object, single, multiple or otherwise very easily, as the flow itself is now just a value.
In Scala, you have streaming sequences, so it becomes as easy as:

io.Source.fromUrl("http://www.example.com/foo.data").flatMap(myFlow(_)).foreach(output(_))


In Scala, there are some helpers that can then apportion and parallelize this quite easily, which I talked about in my previous post.  As we now have a process as our primary value, instead of a chunk of data as our primary value, parallelization becomes much easier, passing our processing function around between threads is far easier than coping with a big chunk of mutable data being shared about.

You can implement this pattern in Java, or C++ or Perl, but most people have to stop and think to do so, the languages doesn't give it to you for free.  In functional programming, from what I'm learning, this is a very common pattern.  In fairness, it's a common pattern in Java too, but many folks don't ever think of it as a default choice until it's already too late.

Monday, July 2, 2012

Logging and Debugging

I'm finding one of the biggest challenges working with Scala is debugging, and secondarily logging.  The former seems to be a tooling issue as much as anything, and to be honest, the latter is a matter of my putting time in to figuring it out.

With debugging, break points in the middle of highly condensed list comprehensions are very very hard to make.  I end up mangling the code with assignments and code blocks that I then have to re-condense later.

I've attached a debugger using the usual jdwp method, but it slows everything down so badly, and it's just not that much better than print statements.  I've been going through the Koans with a new employee at work, and it's been helping both of us greatly.  There's one koan that describes a way to sort of "monkey patch" objects, and as much as I dislike that approach in general, it sure as heck beats Aspects which are hard to control and often fickle unless they are part of your daily routine.

I came up with a small monkey patch for the List class that lets me use inline log function calls to get basic information about the state of a list in the middle of a comprehension chain, so I include it here in the hopes that somebody will find it useful, or have some better ideas!

class ListLoggingWrapper[+T](val original: List[T]) {
  def log(msg: String): List[T] = {
    println(msg + " " + original.size)
    original
  }
  def logSelf(msg: String, truncateTo: Int = 4096): List[T] = {
    println(original.toString().take(truncateTo))
    original
  }
}

implicit def monkeyPatchIt[T](value: List[T]) = new ListLoggingWrapper[T](value)

This helpful snippet allows you to call a new method 'log' on a List object that prints out the List size, and similar with 'logSelf' which allows you to print out the result of toString, truncated (working with large lists means you always end up with pages of hard to pick through output if you don't truncate I've found).

A list comprehension chain ends up looking something like this:

Util.getJsonFilePaths(args(0)).map {
      x: String =>
        new File(x).listFiles().toList.log("File List Size").filter(file => {
          Character.isDigit(file.getName.charAt(0))
        }).map(_.getPath).filter(_.endsWith(".json")).log("Json File Count").flatMap {
          jsonFile =>
            io.Source.fromFile(jsonFile).getLines().toList.log("Line Count for " + jsonFile).map(line => Json.parse[List[Map[String, Object]]](line)).flatMap(x => x).log("Elements in file").logSelf("Elements are", 255).filter(jsonEntry => {
              jsonEntry.get("properties").get.asInstanceOf[java.util.Map[String, Object]].asScala.get("filterPropertyHere") match {
                case None => false
                case Some(value) if (value.toString == "0") => false
                case Some(value) if (value.toString == "1") => true
                case _ => false
              }
            }
            )
        }
    }

Which is a piece of code to aggregate data across multiple JSON files filtering by a given property using Jerkson (which I still feel like I'm missing something with as it seems harder than it should be).