Disruptive Technologies Disruptive Developers

Getting sensor data into a Google Cloud SQL database

In this example we will use a Data Connector to integrate with Google Cloud by forwarding events to a Cloud Function.

When the data has been received by a Cloud Function, it can in turn feed the data into any of Google’s vast selection of databases, event buses, stream processing or machine-learning tools, just to name a few.

Before you begin

You need to have completed the Google Cloud integration guide.

Setup

First you need to create a project, a database, some tables in the database and a Cloud Function.

Create a Project

  1. Log in to the Google Cloud Platform Console
  2. Open the dropdown menu in the top left corner and click Getting started
  3. Click the tile which says Create an empty project
  4. Give it the name “Disruptive Integration”
  5. Select the Billing Account (“My Billing Account” for free trials)
  6. Leave everything else as the default and click Create

Make sure you have the new Project selected by clicking on the drop-down to the left of the top search field. Locate and select the “Disruptive Integration” project.

Create a Database

  1. Open the dropdown menu in the top left corner and click SQL
  2. Click Create instance
  3. Choose PostgreSQL
  4. Fill in
    • Instance ID: “event-database”
    • Default user password - pick one and remember for later
  5. Click Create
  6. Wait for the database to deploy. After creation you have a default user called “postgres”, with the password you chose. You also have an empty database called “postgres” inside the new PostgreSQL instance.
  7. When the database has been deployed, click on the name of the database to open the overview.
  8. On the overview, locate the Instance connection name (something like disruptive-integration:us-central1:test) and note this down as you will need it later.

Create tables

Next, we need to create the database tables where the events will be stored.

Data Connectors have an at-least-once guarantee. To prevent duplicate rows, we make the event_id unique.

  1. Start Cloud Shell by clicking the Cloud Shell icon to the right of the search bar. If prompted, then click “START CLOUD SHELL”.
  2. At the Cloud Shell prompt, connect to your SQL instance using the following command: gcloud sql connect event-database --user=postgres
  3. After a little while, you should get the prompt Connecting to database with SQL user [postgres].Password for user postgres: presented back to you. Enter the default PostgreSQL password you configured earlier
  4. You should now see the prompt postgres=>. For each group of line (starting with CREATE TABLE) in the “SQL queries” below, copy them one by one, paste them into the prompt and hit enter
  5. You should get a line back saying CREATE TABLE each time

SQL queries

CREATE TABLE Touch_events (
  event_id char(20) NOT NULL UNIQUE,
  device_id char(23) NOT NULL,
  timestamp char(30) NOT NULL
);

CREATE TABLE Temperature_events (
  event_id char(20) NOT NULL UNIQUE,
  device_id char(23) NOT NULL,
  timestamp char(30) NOT NULL,
  temperature float NOT NULL
);

CREATE TABLE Prox_events (
  event_id char(20) NOT NULL UNIQUE,
  device_id char(23) NOT NULL,
  timestamp char(30) NOT NULL,
  state varchar(15) NOT NULL
);

You can now close the shell by clicking the X on the top right corner of the shell-window.

Change the Cloud Function code

Finally, we will modify the previously created Cloud Function (part of the “Before you begin”) to listen for sensor events and put this into the database.

There are 4 steps to what we need to do to receive the sensor data and save it into the database:

  1. Add a library for connecting to the database
  2. Get the event data from the request body, and make the query based on event type
  3. Connect to the database and execute a SQL command
  4. Send a response to the Data Connector to let it know the message has been safely received. If an error occurs, send an error code to the Data Connector to let it know to retry the message again later.

Before continuing, navigate to the Cloud Function’s code editor by:

  1. Opening the dropdown menu in the top left corner and go to Cloud Functions.
  2. Click on the name of the Cloud Function to open it.
  3. Press the Edit button.

1. Add library

We need to add a library called pg.

Open package.json. Edit it so it looks something like this.

{
  "name": "sample-http",
  "version": "0.0.1",
  "dependencies": {
    "pg": "7.4.3"
  }
}

2. Get event data and make SQL query

Go back to index.js, remove the existing code and replace it with the following empty function:

    /**
    * Receive sensor events and save to PostgreSQL database
    *
    * @param {!express:Request} req HTTP request context.
    * @param {!express:Response} res HTTP response context.
    */
    exports.eventReceiver = (req, res) => {

    };

Change the Function to execute, below the editor, to be eventReceiver.

Next, add this at the start of the eventReceiver function:


    // Get event data
    var eventId = req.body.event.eventId;
    var deviceId = req.body.event.targetName.substring(38);
    var timestamp = req.body.event.timestamp;
    var eventType = req.body.event.eventType;

    // Create the SQL query based on event type
    switch (eventType) {

        case "temperature":
            var text = "INSERT INTO Temperature_events (event_id, device_id, timestamp, temperature) " +
                        "VALUES ($1, $2, $3, $4);";
            var temperature = req.body.event.data.temperature.value;
            var parameters = [eventId, deviceId, timestamp, temperature];
            
            break;
            
        case "touch":
            var text = "INSERT INTO Touch_events (event_id, device_id, timestamp) " +
                        "VALUES ($1, $2, $3);";
            var parameters = [eventId, deviceId, timestamp];
            break;
            
        case "objectPresent":
            var text = "INSERT INTO Prox_events (event_id, device_id, timestamp, state) " +
                        "VALUES ($1, $2, $3, $4);";
            var state = req.body.event.data.objectPresent.state;
            var parameters = [eventId, deviceId, timestamp, state];
            break;

        default:
            console.warn("Unsupported event type received, check Data Connector. EventType = " + eventType);
            res.status(200).end(); // Return OK to prevent data connector from resending
            return;
    }


    // Log for debugging and information
    console.log("Update: " + deviceId + ", " + eventType + ", " + timestamp);

3. Connect to database and execute command

Add this code at the top (before the function)

const pg = require('pg');

// Connect to database, update with your values
const pool = new pg.Pool({
    max: 1,
    host: '/cloudsql/INSTANCE_CONNECTION_NAME',
    user: 'postgres',
    password: 'YOUR_PASSWORD',
    database: 'postgres'
});

Replace INSTANCE_CONNECTION_NAME with the instance connection name of the PostgreSQL that you noted down earlier.

Replace YOUR_PASSWORD with the default database password you set earlier.

Then add this code at the end of the function

    // Execute query with parameters
    pool.query(text, parameters, (err, result) => {
    if (err) {
        console.error(err);
        res.status(500).end(); // Query failed, send empty error status
    } else {
        res.status(200).end(); // Query succeeded, send empty 200 OK status
    }
    });

4. Send response to Data Connector

pool.query() is an asynchronous function, so we must send back a response at the end of the callback function. In the code above we call res.status(500).end() to send an error response, and res.status(200).end() to send an OK response.


Confirming that the integration works

Everything is now set up for the integration to work.

We will check that the integration is working in three ways:

  1. Check the metrics of the Data Connector
  2. Check the log of the Cloud Function
  3. Check the content of the SQL database

1. Data Connector metrics

Navigate to Studio again and locate the Data Connector created earlier.

If you open up the Data Connector itself, you will be able to see the Activity last 24h.

Here you can see that the Data Connector has successfully sent over sensor events to the Cloud Function, and the Cloud Function has replied 200 OK, 12 times in the last 24h. If the Cloud Function would not reply at all, or reply something else than the HTTP status code 200 OK, then the Error count would increase.

2. Check Cloud Function logs

Google Cloud Platform has a log viewer which shows the log output and return status from the Cloud Function.

Open it by first going to the Cloud Function view, open the Cloud Function and click View logs on the overview page.

Here you should be able to see entries like Function execution took 25 ms, finished with status code: 200, indicating a successfully run Cloud Function, without any errors.

3. Check the SQL database content

To check that the database is storing values:

  1. Open the Google Cloud Shell again (like you did to create the tables)
  2. Run the following query to see all values stored in the temperature table
SELECT * FROM Temperature_events;

Provided that you have generated some temperature events (e.g. by pressing waiting 15min, pressing a temperature sensor or using a virtual sensor), the query should return something like the following:

postgres=> SELECT * FROM Temperature_events;
       event_id       |        device_id        |           timestamp            | temperature
----------------------+-------------------------+--------------------------------+-------------
 beimle6gse4g00838qlg | be8odsegse4g0082e4vg | 2018-09-21T22:03:36.354987851Z |           0
 beimle6gse4g00838qog | be8odsegse4g0082e4vg | 2018-09-21T22:03:36.917462155Z |          42
 beimleegse4g00838qpg | be8odsegse4g0082e4vg | 2018-09-21T22:03:37.242580212Z |          42
 beimh66gse4g00838q4g | be8odsegse4g0082e4vg | 2018-09-21T21:54:32.508543861Z |          34
 beimpqugse4g00838rag | be8odsegse4g0082e4vg | 2018-09-21T22:12:59.580665268Z |          41
 beimpr6gse4g00838rcg | be8odsegse4g0082e4vg | 2018-09-21T22:13:00.704640379Z |          10
 beimpqugse4g00838r7g | be8odsegse4g0082e4vg | 2018-09-21T22:12:59.111525991Z |          22
 beimprmgse4g00838rig | be8odsegse4g0082e4vg | 2018-09-21T22:13:02.834087266Z |          47
 beimprugse4g00838rl0 | be8odsegse4g0082e4vg | 2018-09-21T22:13:03.875003723Z |          13
(9 rows)

The same can be done for Touch_events or Prox_events.


(optional) Add verification of Secret

Verifying the events sent by the Data Connector in the Cloud Function prevents other people from sending false sensor readings into your database. It is not required to use the Secret signature, the Data Connector will work fine without, but it is recommended.

For a full description on how verifying the secret works, see the Verify Secret part of the Data Connector article.

For a step-by-step instruction on adding this to this example integration, see the steps below.

1. Add secret to Data Connector

On the Data Connector configuration page, write a signature under Secret, for example “Secret-signature”. Remember to click Update configuration.

2. Add library to Cloud Function

We must add the library jsonwebtoken to the dependencies of the Cloud Function.

Navigate to the Cloud Function, go to Edit and update the package.json to be:

{
  "name": "sample-http",
  "version": "0.0.1",
  "dependencies": {
    "pg": "7.4.3",
    "jsonwebtoken": "8.3.0"
  }
}

3. Add secret verification code to Cloud Function

Switch to the index.js file and first add requirements at the top:

const jwt = require('jsonwebtoken');
const crypto = require('crypto');

Next, add the following code to the very start of the function:

    // Verify signature
    try {
        var token = req.get('x-dt-signature');
        var jwtPayload = jwt.verify(token, "Secret-signature"); // Update to your secret
    } catch(err) {
        console.error(err);
        res.sendStatus(401);
        return;
    }

    // Verify checksum
    var hash = crypto.createHash('sha1');
    hash.update(req.rawBody);

    if (jwtPayload.checksum != hash.digest('hex')) {
        console.error('Invalid content');
        res.sendStatus(401);
        return;
    }

The function will now reject all invocations with an invalid token or checksum.

To test that the Cloud Function rejects requests with the wrong signature, try changing or removing the secret in the Data Connector and watch the Error metric now increment, as the Cloud Function is rejecting the events.