Skip to content

Streaming System Design Doc

Will Engler edited this page Jun 29, 2017 · 6 revisions

This document describes how observations from the Array of Things (and - hopefully - other sensor networks, eventually) get into Plenario and out to users. It is about system internals and assumes the reader is familiar with the API it is implementing. For that background, please refer to the public API docs on Plenario sensor network support.

Contents

  1. Overview
  2. Beehive
  3. Publish Lambda
  4. Resolve Lambda
  5. Socket.io Server
  6. Apiary
  7. Glossary

oh look a box and arrow diagram

How it works:

Beehive is a Flask application that communicates directly with nodes out in the wild. It lives off-cloud, so we need some interface to get sensor observations from Beehive into Plenario. We want a mailbox that has minimal downtime, and that a consuming application can pull records off of at its own pace. Kinesis fits the bill. Beehive pushes observations one by one using boto to a Kinesis Stream that the Plenario team provisions - the "Publication Stream".

Output:

A record from beehive is a collection of values recorded from one sensor at one time. It is formatted like:

{
    datetime: "2017-04-07 17:50:51",
    network: "array_of_things_chicago",
    meta_id: 1,
    data: {
      orient_y: 1,
      orient_z: -1,
      accel_z: 30,
      orient_x: 3,
      accel_y: 981,
      accel_x: -10
    },
    sensor: "bmi160",
    node_id: "0000001e0610b9e7"
  }

Notes on the fields:

  • The keys in the data object are what we call "beehive nicknames." Beehive does not use Plenario's concept of features of interest. Plenario maintains metadata (through Apiary) that maps beehive nicknames to features of interest. You'll see in following examples what this translation will look like.
  • meta_id is essentially unimplemented on the Beehive side and Plenario side. It is intended to be a pointer to the configuration of the network at a moment in time. Plenario only maintains what a network looks like "now." So if a node was 15 feet off the ground for 3 months, then moved 5 feet higher, metadata queries about that node will say it is 20 feet off the ground. But the idea is that you could take that record's meta_id, query what the network was like when that id applied, and learn that the node was 15 feet off the ground.
  • network, sensor, and node_id are expected to match network configuration registered through Apiary.

Scaling concerns:

We have created a one-off access key that enables Beehive to push to that stream and given it to the Waggle maintainer. For now there is just one beehive, living at Argonne. To enable multiple beehives and non-Waggle sensor networks to publish observations, we will likely need to automate the credential-making process. Once we have multiple network maintainers pushing to the same stream, we'll also want to authenticate the sender.

How it works:

Publish is a Node.JS lambda that reads from the Publication stream. It is invoked with a batch of 1 to 100 records at a time. It munges them and sends them down two paths: into Firehose to be copied into Redshift and to all socket.io servers via Redis.

Output:

It forces the datetime field to ISO8601 and lower-cases the identifiers. This produces JSON sent to the socket servers as

{
    datetime: "2017-04-07T17:50:51",
    network: "array_of_things_chicago",
    meta_id: 1,
    data: {
      orient_y: 1,
      orient_z: -1,
      accel_z: 30,
      orient_x: 3,
      accel_y: 981,
      accel_x: -10
    },
    sensor: "bmi160",
    node: "0000001e0610b9e7"
  }

For Firehose, it formats the records in a CSV row, like

array_of_things_chicago,0000001e0610b9e7,2017-04-07T17:50:51,1,bmi160,"{""orient_y"":1,""orient_z"":-1,""accel_z"":30,""orient_x"":3,""accel_y"":981,""accel_x"":-10}"

Scaling Concerns:

We are working on getting an estimate of how long it takes this to execute under normal circumstances. Insert more thoughts on figuring out when/how to provision extra shards.

How it works:

Resolve is a Python lambda that organizes sensor observations. Firehose dumps observations into a staging table in RedShift once a minute. Resolve places the observations waiting in the staging table into the correct observation tables (e.g. putting temperature observations in the temperature table).

How it works:

The socket server is one or more Node.js servers running in an Elastic Beanstalk group. It serves Websocket connections with the help of the Socket.io library. The Beanstalk group runs behind an AWS Application Load Balancer, enabling a "sticky" connection from each client to the instance it first connected to. Thus, one instance of the socket server does not need to know about another instance's subscribers.

Each instance subscribes to the Redis instance that the Publish lambda publishes to and queries Postgres for sensor network metadata. The server takes query arguments that allow subscribers to specify the subset of a sensor network they're interested in.

Output:

When the server receives a batch of records from the Publish lambda, it breaks them into individual observations. Carrying on the example from the Publish lambda's example output, it would produce the following observations:

[
      {
        type: "sensorObservations",
        attributes: {
          sensor: "bmi160",
          node: "0000001e0610b9e7",
          meta_id: 1,
          network: "array_of_things_chicago",
          datetime: "2017-04-07T17:50:51",
          feature: "orientation",
          properties: { x: 3, y: 1, z: -1 }
        }
      },
      {
        type: "sensorObservations",
        attributes: {
          sensor: "bmi160",
          node: "0000001e0610b9e7",
          meta_id: 1,
          network: "array_of_things_chicago",
          datetime: "2017-04-07T17:50:51",
          feature: "acceleration",
          properties: { x: -10, y: 981, z: 30 }
        }
      }
    ]

Using the sensor metadata stored in Postgres (that maps beehive nicknames to Plenario features of interest), the socket server recognizes that there are two distinct features - acceleration and orientation - contained in this record and splits them apart. So if a user has just subscribed to orientation, they will only receive that observation. It also enforces a JSONAPI-like formatting on observations.

Scaling Concerns:

As I write this, we are working on load testing to determine how we should set the scaling trigger for the Beanstalk group, which instance type is optimal, and roughly how many subscribers per instance we can expect to handle. Another big outstanding feature is an API key system to manage traffic.

How it works:

Apiary is a Django app that maintains sensor network metadata in the Plenario Postgres database. Because sensor metadata is pretty straightforward (Networks have many nodes, nodes have many sensors...), this app can also be straightforward.

Scaling Concerns:

Right now, the Beehive administrator communicates to Jesse over email or Slack when new nodes are coming online and Jesse uses the Apiary admin console to add the nodes. In the initial ramp-up stage this is manageable, but far from ideal. We're working with the Beehive team to install triggers that will hit the Apiary API when it has state changes to communicate.

Clone this wiki locally