Spiria logo.

Your Own RFID Gateway Written in Node.js

August 29, 2014.

In our previous post we talked about a general architecture of an RFID-based asset tracking system and, specifically, about an API server in the heart of such a system. Now we are going to take a look at the remote nodes collecting the data that comes from RFID readers and sending this data through some API to the database. Let's create our proper RFID gateway server software with Node.js.

Architecture

Hardware

The hardware part didn't change a lot since the previous article, though our diagram will be scoped on the peripherals now:

Software

On the software side, our goal will be to leverage asynchronous design patterns forming the basis of Node.js API: event emitters and streams.

Since we can (and most likely would in the real life) have multiple RFID readers connected to the same gateway, we have to be able to consume data from several reader modules independently. Moreover, these modules can correspond / talk to the different types of hardware while reporting to the same gateway. Finally, in order to unload the main process and take advantage of multi- processor / core architectures we are putting those reader modules in separate child processes. These processes are piping the data to the Listener module of the parent process, which in its turn analyses and transforms (if needed) the data and pipes it further to theUploader module. The latter is the one transmitting the results to the cloud via HTTP or some other protocol.

Implementation

We are assuming further that you already have Node.js and npm installed and node / npm commands are accessible. Let's create a new package for our RFID gateway by typing npm init once we are in the target folder. After answering a few questions (just use the defaults for now), we will have our package.json created. Next, let's assume we have a file config.json containing some configuration for our gateway in the root folder of our package. Then the entry point for our application (./index.js) can look like this (the complete code of the implementation is available here):

'use strict';

/**
 * Dependencies
 */
var config = require('./config.json');
var Gateway = require('./lib/gateway');

module.exports = new Gateway(config);

Obviously, this is a bit simplyfied, and in real life we should always check if the configuration file exists, etc., but here we will omit these verifications for sake of simplicity. Also, at this point we don't even care about the exact content of config.json, as long as it contains all the data necessary to connect to our RFID readers (e.g. ports and reader types) and to transmit the information to the cloud (e.g. endpoints, authentication credentials, etc.).

Gateway Module

Gateway module is the core of our gateway's master process. Architectured as an event emitter, it handles the system signals, deployment-related events, creation of scanning and uploading objects, etc. Here is how it looks like (this code goes to ./lib/gateway.js):

'use strict';

/**
 * Dependencies
 */
var EventEmitter = require('events').EventEmitter;
var util = require('util')
var async = require('async');
var _ = require('lodash');
var Uploader = require('./uploader');
var Listener = require('./listener');
var defaults = require('./defaults.json');

module.exports = Gateway;

// Gateway module
function Gateway(config) {
  if (!(this instanceof Gateway))
    return new Gateway(config);

  EventEmitter.call(this);

  // Configuration validation, defaults, etc.
  config = config || {};
  _.defaults(config, defaults);

  // Instantiate a listener and an uploader
  var uploader = new Uploader(config['uploader']);
  var listener = new Listener(config['listener']);

  // Deployment-related events: online message
  async.each([listener, uploader], function(stream, cb) {
    stream.on('error', function(err) {
      console.error(err);
      process.send('shutdown');
    });

    stream.on('ready', function() {
      cb();
    });
  }, function(err) {
    if (err) {
      console.error(err);      
      process.send('shutdown');
    }

    process.send('online');
  });

  // Deployment-related events: shutdown message
  process.on('message', function(message) {
    if (message === 'shutdown') {
      async.each([listener, uploader], function(stream, cb) {
        stream.emit('shutdown');

        stream.on('close', function() {
          cb();
        });
      }, function(err) {        
        process.exit((err) ? 1 : 0);
      });
    }
  });

  // Let's get it started
  listener.pipe(uploader);
}

util.inherits(Gateway, EventEmitter);

Some key sections of this constructor include:

  • configuration validation (feel free to validate more thoroughly based on your specific case), applying defaults;
  • creation of Listener and Uploader instances;
  • emitting online event once both listener and uploader are ready (this is necessary in order to provide zero-downtime deployment, with Naught, for example);
  • listening for the shutdown message from our deployer in order to gracefully close all the connections and stop the process.

Finally, thanks to stream-based architecture we used, starting it all becomes as simple as piping our listener to our uploader:

listener.pipe(uploader);

Now, let's take a closer look at the components responsible for reading the data and transmitting it upstream.

Listener

As we saw on the software architecture diagram, Listener module object is a transform stream, which means it is writable and readable (and can be piped from and to). Since the current version of Node.js streams (as of v0.10, so called "streams2") operates on Strings and Buffers by default, we need to put our Listener stream in the object mode if we want it to emit generic JavaScript values. So, we have to call its parent constructor like this:

Transform.call(this, {objectMode: true});

Next, we need to fork child processes for different RFID readers attached to our gateway (as listed in the configuration file). Using async library, we make sure all the devices are activated (see attach function) and then emit ready event on our Listener object. The whole module we will have in lib/listener.js is listed below:

'use strict';

/**
 * Dependencies
 */
var Transform = require('stream').Transform;
var util = require('util');
var fork = require('child_process').fork;
var async = require('async');
var split2 = require('split2');

module.exports = Listener;

function Listener(config) {
  var that = this;

  config = config || {};
  config['devices'] = config['devices'] || [];

  Transform.call(this, {objectMode: true});

  async.each(config['devices'], function(deviceConfig, cb) {
    that.attach(deviceConfig, function(err) {  
      cb(err);
    });
  }, function(err) {
    if (err)
      return that.emit('error', err);

    that.emit('ready');
  });
}

util.inherits(Listener, Transform);

Listener.prototype._transform = function(message, enc, next) {

  // We can do some preprocessing here

  this.push(message, enc);

  next();
};

Listener.prototype.attach = function(config, done) {
  var that = this;

  var device = fork(config.module, [], {
    silent: true
  });

  device.send({
    event: 'connect',
    config: config
  });

  device.on('message', function(message) {
    switch (message.event) {
      case 'ready':
        done();
        break;
      case 'error':
        that.emit('error', message.error);
        break;
      default:
        break;
    }
  });

  device.stdout.pipe(split2(JSON.parse)).pipe(this);
};

Here, attach method receives a configuration object containing all the information necessary to connect to the reader and start scanning, including the module property, which is a path to the reader driver module to be forked. Note, that we are forking this child process with silent option enabled. It means that stdout stream of this process will be piped to device.stdout, which, in its turn, we are piping to our very Listener stream through another transform stream, split2(JSON.parse). What the latter does, it takes the chunks coming from this stdout stream (which is not in the object mode, i.e. can be sending fragmented output), assembles them into complete lines and parses into JSON objects that are being pushed further. This processing procedure defines the following principles that have to be used while architecturing reader modules:

  • reader module has to print the scanning results as stringified JSON objects, one per line, to its stdout;
  • if an error occurres, the reader process should send a message object with event property equal toerror (this message will be transmitted upstream to the gateway module).

Also, as you can notice, _transform function (necessary for implementation of a transform stream) doesn't do a lot in our case simply pushing further all the messages it receives, but at the same time it could be used to do some preprocessing (and/or error handling). For example, we could cache the messages, analyze the new input and push it only in case of some changes we are tracking.

Uploader

Finally, after all these manipulations we have our data flowing into the Uploader object, which is a writable stream with the object mode enabled, which means it receives JavaScript objects sent by the reader driver modules. Since the implementation of a specific approach to sending the data to remote servers falls beyond the scope of this article, we will just log the incoming messages to the console. The code below goes to./lib/uploader.js:

'use strict';

/**
 * Dependencies
 */
var Writable = require('stream').Writable;
var util = require('util');

module.exports = Uploader;

function Uploader(config) {
  Writable.call(this, {objectMode: true});
}

util.inherits(Uploader, Writable);

Uploader.prototype._write = function(message, enc, next) {
  console.info('Sending: ' + JSON.stringify(message));

  // Sending to the cloud here

  next();
};

Reader Modules

One of the most important subjects we haven't looked into yet is the reader modules. The main reason is the fact that any specific implementation of such a module would rely heavily on the hardware being used. In other words, since there is no standard approach to the way RFID readers connect to gateways, every reader manufacturer (or even devices from the same manufacturer) requires its own reader module to be written. We are planning to examine some examples of such modules in greater depth in our future posts, but for now let's just use the mockout package to simulate messages from a reader module.

Putting It All Together

There is a couple of last things we need to do before we will be able to start our gateway. First, we need an example config file. This will go to ./config.json:

{
  "listener": {
    "devices": [
      {
        "path": "/some/path1",
        "module": "./node_modules/mockout"
      },
      {
        "path": "/some/path2",
        "module": "./node_modules/mockout"
      },
      {
        "path": "/some/path3",
        "module": "./node_modules/mockout"
      }
    ]
  },
  "uploader": {
  }
}

Also, let's put default configuration values in ./config/defaults.json:

{
  "listener": {
    "devices": []
  },
  "uploader": {}
}

Finally, we have to install the packages used in the implementation of our RFID gateway:

npm install --save async@0.9.0 lodash@2.4.1 mockout@0.0.2 split2@0.1.2

This will also add listed packages to the dependencies section of our ./package.json file. And that is it. If we run node index.js now, we will see some output similar to the following:

Sending: {"sortOf":"anObject","initializedWith":{"event":"connect","config":{"path":"tmr:///dev/ttyACM2","module":"./node_modules/mockout"}}}
Sending: {"sortOf":"anObject","initializedWith":{"event":"connect","config":{"path":"tmr:///dev/ttyACM1","module":"./node_modules/mockout"}}}
Sending: {"sortOf":"anObject","initializedWith":{"event":"connect","config":{"path":"tmr:///dev/ttyACM3","module":"./node_modules/mockout"}}}
Sending: {"sortOf":"anObject","initializedWith":{"event":"connect","config":{"path":"tmr:///dev/ttyACM2","module":"./node_modules/mockout"}}}
Sending: {"sortOf":"anObject","initializedWith":{"event":"connect","config":{"path":"tmr:///dev/ttyACM1","module":"./node_modules/mockout"}}}
Sending: {"sortOf":"anObject","initializedWith":{"event":"connect","config":{"path":"tmr:///dev/ttyACM3","module":"./node_modules/mockout"}}}
...

This means that we have three child processes piping the data from RFID readers through our Listener to the Uploader, which is ready to send it wherever we want.

Next Steps

So, does this implementation cover completely the software for peripheral nodes of our RFID system? Or course, no. It provides a robust way to organize the flow of data from the hardware to the point when it's ready to be sent to remote servers (which is pretty powerful result for only about two hundred lines of code), but specific implementation of reader modules (for different RFID manufacturers) and uploading procedures (which, for example, can use special protocols like MQTT or CoAP) worth a closer look and can be a topic of our further articles.