Performance Zone is brought to you in partnership with:

Creator of the Apache Tapestry web application framework and the Apache HiveMind dependency injection container. Howard has been an active member of the Java community since 1997. He specializes in all things Tapestry, including on-site Tapestry training and mentoring, but has lately been spreading out into fun new areas including functional programming (with Clojure), and NodeJS. Howard is a DZone MVB and is not an employee of DZone and has posted 81 posts at DZone. You can read more from them at their website. View Full User Profile

A Little Gotcha With Node.js Async and Streams

08.10.2012
| 5578 views |
  • submit to reddit

I stumbled across a little gotcha using async with Node.js Streams: you can easily corrupt your output if you are not careful.

Node.js Streams are an abstraction of Unix pipes; they let you push or pull data a little bit at a time, never keeping more in memory than its needed. async is a library used to organize all the asynchronous callbacks used in node applications without getting the kind of "Christmas Tree" deep nesting of callbacks that can occur too easily.

I'm working on a little bit of code to pull an image file, stored in MongoDB GridFS, scale the image using ImageMagick, then stream the result down to the browser.

My first pass at this didn't use ImageMagick or streams, and worked perfectly ... but as soon as I added in the use of async (even before adding in ImageMagick), I started getting broken images in the browser, meaning that my streams were getting corrupted.

Before adding async, my code was reasonable:
  app.get "/images/review-thumbnail/:id", (req, res) ->

    id = req.params.id

    store = new GridStore mongoose.connection.db, new ObjectID(id), null, "r"

    # file is just an alias for store but has some semantic value
    store.open (err, file) ->

      if err
        return res.send "Unable to open file", 500

      if file.eof()
        return res.send "Image #{id} not found.", 404

      res.contentType file.contentType

      file.stream(true).pipe res

However, I knew I was going to add a few new steps here to pipe the file content through ImageMagick; that's when I decided to check out the async module.

The logic for handling this request is a waterfall; each step kicks off some work, then passes data to the next step via an asynchronous callback. The async library calls the steps "tasks"; you pass an array of these tasks to async.waterfall(), along with the end-of-waterfall callback. This special callback may be passed an error provided by any task, or the final result from the final task.

With waterfall(), each task is passed a special callback function. If the callback function is passed a non-null error as the first parameter, then remaining tasks are skipped, and the final result handler is invoked immediately, to handle the error.

Otherwise, you pass null as the first parameter, plus any additional result values. The next task is passed the result values, plus the next callback. It's all very clever.

My first pass was to duplicate the behavior of my original code, but to do so under the async model. That means lots of smaller functions; I also introduced an extra step between getting the opened file and streaming its contents to the browser. The extra step is intended for later, where ImageMagick will get threaded in.

The code, despite the extra step, was quite readable:
  app.get "/images/review-thumbnail/:id", (req, res) ->

    id = req.params.id

    openFile = (callback) ->
      store = new GridStore mongoose.connection.db, new ObjectID(id), null, "r"

      store.open callback

    readFileContents = (file, callback) ->

      if file.eof()
        return callback(new ErrorResponse 404, "Image #{id} not found")

      res.contentType file.contentType

      callback null, file.stream(true)

    streamToClient = (stream, callback) ->

      stream.pipe res

      callback null

    errorCallback = (err) ->
      if err
        sendErrorResponse res, err

    async.waterfall [ openFile, readFileContents, streamToClient], errorCallback

My style is to create local variables with each function; so openFile kicks off the process; once the file has been retrieved from MongoDB, the readFileContents task will be invoked ... unless there's an error, in which case errorCallback gets invoked immediately.

Inside readFileContents we convert the file to a stream with file.stream(true) (the true means to automatically close the stream once all of the file contents have been read from GridFS).

streamToClient comes next, it takes that stream and pipes it down to the browser via the res (response) object.

So, although its now broken up into more small functions, the logic is the same, as expressed on the very last line: open the file, read its contents as a stream, stream the data down to the client.

However, when I started testing this before moving on to at the image scaling step, it no longer worked. The image data was corrupted. I did quite a bit of thrashing: adding log messages, looking at library source, guessing, and experimenting (and I did pine for a real debugger!).

Eventually, I realized it came down to this bit of code from the async module:
    async.waterfall = function (tasks, callback) {
        callback = callback || function () {};
        if (!tasks.length) {
            return callback();
        }
        var wrapIterator = function (iterator) {
            return function (err) {
                if (err) {
                    callback(err);
                    callback = function () {};
                }
                else {
                    var args = Array.prototype.slice.call(arguments, 1);
                    var next = iterator.next();
                    if (next) {
                        args.push(wrapIterator(next));
                    }
                    else {
                        args.push(callback);
                    }
                    async.nextTick(function () {
                        iterator.apply(null, args);
                    });
                }
            };
        };
        wrapIterator(async.iterator(tasks))();
    };

The code on line 7 is the callback function passed to each task; notice that once it decides what to do, on line 21 it defers the execution until the "next tick".

The root of the problem was simply that the "next tick" was a little too late. By the time the next tick came along, and streamToClient got invoked, the first chunk of data had already been read from MongoDB ... but since the call to pipe() had not executed yet, it was simply discarded. The end result was that the stream to the client was missing a chunk at the beginning, or even entirely empty.

So that's our Leaky Abstraction for today; what looked like an immediate callback was deferred just enough to change the overall behavior. And that, in Node, anything that can be deferred, will be deferred, since that makes the overall application that much zippier.  

Published at DZone with permission of Howard Lewis Ship, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)