Recording weather with Arduino, Elasticsearch and Kibana 3

Recording weather with Arduino, Elasticsearch and Kibana

Miss the first part of this series?  It's here!

Part 3 - Software and Testing

One of the more amusing aspects of this exercise was to code for small platforms.

The main challenge revolves around measuring speed and rainfall, because these instruments simply generate a pulse when something happens, and the code needs to be ready to act, but at the same time, there is a degree of "housekeeping" which needs to be done.

The simplest method of doing this is to generate an interrupt whenever a pulse is received from one of the instruments.  This way, the CPU can give you the appearance of doing multiple things (like housekeeping and uploading) at once.

I like to think my coding ability is adequate, but doing this on an Raspberry  Pi, CHIP, BeagleBone etc means I have to write my ISR (Interrupt Service Routines) as kernel modules, and that is just more work than I want to commit to for my own amusement.  I needed to get back pretty much to the bare metal, which made Arduino a simpler choice.

However, if you have done any coding for Arduino, you soon realise that it is a single-threaded platform.  In essence the code you upload plus the runtime libraries are the operating system.  Don't worry, this is good news because it means we can largely do what we want with the processor, and the Atmel (now Microchip) CPUs are very flexible with interrupts and the Arduino ecosystem already has good library support.

Stay with me folks, this is going to get interesting...

We have three sources of interrupts in our system:
  • Anemometer
  • Rainfall
  • Internal Timer
The key to how all this operates is the Internal Timer.  I have used the "Timer-1" library which can be installed via the Arduino Library Manager.

Timer1 establishes an interrupt which fires every 500mS.  It is not perfectly accurate, but since the CPU is not working all that hard, it will be accurate enough for this job.  The basic loop structure of the program is as follows:

loop() {
    while(timer count < 120) {
    }
    increment minutes by 2
    collect readings
    build statistics for transmit
    transmit
    if minutes >= 1440
        reset daily statistics
}

So in essence, the CPU spends most of its time waiting for two minutes to expire.

Sensor Interrupts
When the rainfall or wind speed sensors generate a pulse, an interrupt will be raised in the CPU.  The ISR (Interrupt Service Routine) delays 50mS to debounce the input.  Unless a cyclone hits Sydney, we won't miss any pulses in this debounce routine.

After the debounce period, either the wind speed is calculated or the rain counter is incremented.  No other work is done in this time to make sure that we do as little as possible within the interrupt context.

When the timer count exceeds 1 minute (2 x 0.5s = 120 ticks), the real work begins.  All the sensors are read, and then a timestamp needs to be generated.

Timing
The timestamp is necessary because Elasticsearch cannot generate one by itself.  I could mess around with log stash and have it do so, but since we have a network connection, why not use NTP to do so?

A further gotcha, is that we need to convert the timestamp to mS for consumption by Elasticsearch.  Pretty simple, multiply the value by 1000.

However if you don't want (or can't) use NTP, you could use a small RTC module and ditch all the NTP code.

Sensor Calculations
This is the one place you are on your own.  You need four pieces of information to get useful readings from your sensors:
  • For the rain fall sensor, how many mm per "tip"
  • For wind speed, how many pulses/revolution and the circumference of the sensor
  • For wind direction, the values of resistor for each position.
Rainfall is pretty simple.  Most gauges will be between 0.1 and 1.0mm per tip event. If you can't find the value, you can always compare it with a rain gauge.  Set BUCKET_SIZE to this value.

Wind speed is going to need some math.  It's easy enough to use a multimeter or test lamp to measure pulses/rotation.  For low-cost senders the answer seems to always be 1.

The math comes in because we need to know how far a single cup travels in a revolution.  Measure the distance from the centre of the cup, to the centre of the anemometer axle.  This is the radius (r) of our circle.  Double that to get the diameter.  From high-school you might remember that the circumference of a circle is calculated as:

pi x d

If you measured the diameter in millimetres (mm), then if you divide your result by 1,000 you get the circumference in metres.  This is the value to set ROT_VALUE to.

The Wind direction resistance is for reference only at this point.  When we start feeding data into Elasticsearch, you will see how we translate it into a direction.

Networking
Much of the code for this project is to support the networking tasks.  The Ether10 is compatible with the Arduino Ethernet Shields, so the code should need little, if any, modifications.  Most of this code has come from the Arduino examples, and should be easy enough to understand.  The biggest catch is that the MAC address for the controller is hard-coded.  Later versions of Ether10 than mine have a unique IP address.  If you ran up two boards with the same code, you can going to have problems on the local network, so be sure to change the values in the byte mac[] line.

The Code

Please note that this is not always going to be the most recent version of the code.  Current releases can be found on bitbucket.  The URL is:


There are usually 2 branches of the code on Bitbucket.  The first is "master", which is tested and stable.  If you use the same basic components as I did, there's no reason it should not work for you.

The other is "develop", which is where all the fun stuff is happening.  Any new features get added via "develop", so it might not always be perfect, but it will at least compile and run for me.


/*
 * Inspired by a lot of different ideas
 * An Arduino-based weather station.
 * 
 * Owing to memory limitations, this sketch does NOT use SSL.
 * Be careful what you send over it.
 */

 /*
  * PORT ALLOCATIONS
  * PORT      DESIGNATION     DIRECTION     TYPE      DESCRIPTION
  * D13       LED_PIN         OUT           DIGITAL   On-Board LED.  General Indication
  * D12       DATA_PIN        OUT           DIGITAL   "Network" LED.  Activity indication
  * D3        RAIN_PIN        IN            DIGITAL   Rain Sensor Input - 1 pulse/tip
  * D2        SPEED_PIN       IN            DIGITAL   Speed Sensor Input - 1 pulse/revolution
  * A5        SCK             OUT           DIGITAL   I2C SCK (Clock Signal)
  * A4        SDA             IN/OUT        DIGITAL   I2C SDA (Data I/O)
  * A1        VANE_PIN        IN            ANALOGUE  Wind Direction
  * A0        UV_PIN          IN            ANALOGUE  UV Sensor
  * 
  */

/*
 *
 * Note:  You need to create the ElasticSearch Index and Mappings beforehand
 * curl -XPUT 'http://host:9200/test?pretty' -H 'Content-Type: application/json' -d'
{
    "mappings" : {
        "reading" : {
            "properties" : {
                "deviceid" : { "type" : "text" },
                "temperature" : { "type" : "float" },
                "humidity" : { "type" : "float" },
                "pressure" : { "type" : "float" },
                "dewpoint" : { "type" : "float" },
                "windspeed" : { "type" : "float" },
                "winddirection" : { "type" : "float" },
                "uv" : { "type" : "float" },
                "timestamp" : { "type" : "date" } 
            }
        }
    }
}
'

  */
#include <TimerOne.h>
#include <Ethernet.h>
#include <EthernetUdp.h>
#include <Dhcp.h>
#include <EthernetClient.h>
#include <Dns.h>
#include <BME280.h>
#include <BME280I2C.h>

//#define _DEBUG
#define _SPLUNK
//#define _ELASTIC

#ifdef _ELASTIC
#define HOST        "your-elastic-host"
#define PORT        9200
#endif
#ifdef _SPLUNK
#define HOST        "your-splunk-host"
#define PORT        8088
#endif
#define BUCKET_SIZE 0.2794   // Rain in mm per tipping event
#define ROT_VALUE   0.666667 // Distance of one rotation
#define RAIN_PIN    3
#define SPEED_PIN   2
#define UV_PIN      (A0)
#define VANE_PIN    (A1)
#define LED_PIN     13
#define DATA_PIN    12

BME280I2C bme;
EthernetClient client;
EthernetUDP udp;
unsigned long unixTime;
  

float temperature;
float humidity;
float pressure;
float windSpeed;
float dewPoint;
unsigned long tipCount;
unsigned long contactTime;
unsigned long contactBounceTime;
unsigned long totalRainfall;
unsigned long rotationCount;
unsigned int uv;
unsigned int timerCount;
unsigned int readingTime;
unsigned int totalMinutes;
int vaneReading;
int windDirection;
boolean masterStatus;

void readTemperature() {
  temperature = bme.temp(true);
}

void readHumidity() {
  humidity = bme.hum();
}

void readPressure() {
  pressure = bme.pres(1);
}

void readDewPoint() {
  dewPoint = bme.dew(bme.temp(), bme.hum(), true);
}

/*
 * Vout (mV)    UV Index
 * < 50         0
 * 227          1
 * 318          2
 * 408          3
 * 503          4
 * 606          5
 * 696          6
 * 795          7
 * 881          8
 * 976          9
 * 1079         10
 * > 1170       11+
 */
void readUV() {
//  uv = analogRead(UV_PIN);
  uv = map(analogRead(UV_PIN), 0, 1023, 0, 5000);
}

void readWindDirection() {
  vaneReading = analogRead(VANE_PIN);
  windDirection = map(vaneReading, 0, 1023, 0, 5000); // Convert to mV
}

void isr_rain() {
  if((millis() - contactTime) > 15) {
    tipCount++;
    totalRainfall = tipCount * BUCKET_SIZE;
    contactTime = millis();
  }
}

void isr_wind() {
  // Temporary.  Show that we are in the ISR
  digitalWrite(DATA_PIN, HIGH);
  if((millis() - contactBounceTime) > 15) {
    rotationCount ++;
    contactBounceTime = millis();
  }
  digitalWrite(DATA_PIN, LOW);
}

void isr_timer() {
  digitalWrite(LED_PIN, HIGH);
  
  timerCount++;
  readingTime++;

  if(timerCount == 5) {
    // Need to divide count by 60 to get m/s
    // Then multiply by 3.6 for Km/h
    windSpeed = rotationCount / 60 * ROT_VALUE * 3.6;
    rotationCount = 0;
    timerCount = 0;
  }
  digitalWrite(LED_PIN, LOW);
}

void blinkLED(unsigned int times) {
  for(int i = 0; i < times; i++) {
    digitalWrite(LED_PIN, HIGH);
    delay(75);
    digitalWrite(LED_PIN, LOW);
    delay(75);
  }
}

unsigned long inline ntpUnixTime (UDP &udp)
{
  static int udpInited = udp.begin(123); // open socket on arbitrary port

  const char timeServer[] = "pool.ntp.org";  // NTP server

  // Only the first four bytes of an outgoing NTP packet need to be set
  // appropriately, the rest can be whatever.
  const long ntpFirstFourBytes = 0xEC0600E3; // NTP request header

  // Fail if WiFiUdp.begin() could not init a socket
  if (! udpInited)
    return 0;

  // Clear received data from possible stray received packets
  udp.flush();

  // Send an NTP request
  digitalWrite(DATA_PIN, HIGH);
  if (! (udp.beginPacket(timeServer, 123) // 123 is the NTP port
   && udp.write((byte *)&ntpFirstFourBytes, 48) == 48
   && udp.endPacket()))
    return 0;       // sending request failed

#ifdef _DEBUG
  Serial.println("Waiting for ntp");
#endif
  // Wait for response; check every pollIntv ms up to maxPoll times
  const int pollIntv = 150;   // poll every this many ms
  const byte maxPoll = 15;    // poll up to this many times
  int pktLen;       // received packet length
  for (byte i=0; i<maxPoll; i++) {
    if ((pktLen = udp.parsePacket()) == 48)
      break;
    delay(pollIntv);
  }
  if (pktLen != 48)
    return 0;       // no correct packet received

  // Read and discard the first useless bytes
  // Set useless to 32 for speed; set to 40 for accuracy.
  const byte useless = 40;
  for (byte i = 0; i < useless; ++i)
    udp.read();

  // Read the integer part of sending time
  unsigned long time = udp.read();  // NTP time
  for (byte i = 1; i < 4; i++)
    time = time << 8 | udp.read();

  // Round to the nearest second if we want accuracy
  // The fractionary part is the next byte divided by 256: if it is
  // greater than 500ms we round to the next second; we also account
  // for an assumed network delay of 50ms, and (0.5-0.05)*256=115;
  // additionally, we account for how much we delayed reading the packet
  // since its arrival, which we assume on average to be pollIntv/2.
  time += (udp.read() > 115 - pollIntv/8);

  // Discard the rest of the packet
  udp.flush();
  digitalWrite(DATA_PIN, LOW);
  return time - 2208988800ul;   // convert NTP time to Unix time
}



void setup() {
  pinMode(LED_PIN, OUTPUT);
  pinMode(DATA_PIN, OUTPUT);
  pinMode(RAIN_PIN, INPUT);
  pinMode(SPEED_PIN, INPUT);
  digitalWrite(DATA_PIN, HIGH);
  
  // If using the Freetronics EtherTen board,
  // the W5100 takes a bit longer to come out of reset
  delay(100);   // This should be plenty

#ifdef _DEBUG
  Serial.begin(9600);
  while(!Serial) {
    blinkLED(1);
    delay(10);    // Wait for serial port
  }
#endif
  
  while(bme.begin() == false) {
#ifdef _DEBUG    
    Serial.println("Waiting for BME280");
#endif
    blinkLED(2);
    delay(1000);
  }

#ifdef _DEBUG
  Serial.println("BME280 is initialised!");
#endif

  byte mac[] = {
    0x02, 0x60, 0x8C, 0x01, 0x02, 0x03
  };
  while(Ethernet.begin(mac) == 0) {
#ifdef _DEBUG    
    Serial.println("Waiting for DHCP");
#endif
    blinkLED(3);
    delay(1000);
  }
#ifdef _DEBUG
  Serial.println("Network is active");
#endif

  // Set all the globals to a valid starting point.
  temperature = 0.0;
  humidity = 0.0;
  pressure = 0.0;
  uv = 0;
  totalRainfall = 0;
  tipCount = 0;
  contactTime = 0;
  contactBounceTime = 0;
  rotationCount = 0;
  timerCount = 0;
  readingTime = 0;
  totalMinutes = 0;   // Use this to determine 24 hours

  // Initialise the interrupt for the rain gauge
  attachInterrupt(digitalPinToInterrupt(RAIN_PIN), isr_rain, FALLING);
#ifdef _DEBUG
  Serial.println("Attached Rain Sensor");
#endif

  // Initialise the interrupt for wind speed
  attachInterrupt(digitalPinToInterrupt(SPEED_PIN), isr_wind, RISING);
#ifdef _DEBUG
  Serial.println("Attached Speed Sensor");
#endif

  // Set up a timer for 1/2 second intervals
  Timer1.initialize(500000);
  Timer1.attachInterrupt(isr_timer); 
#ifdef _DEBUG
  Serial.println("Attached Timer");
#endif

  sei();    // Enable interrupts
  
  digitalWrite(DATA_PIN, LOW);
}

void loop() {
  if(readingTime > 119) {    // 1 minute has expired

    blinkLED(4);
    readTemperature();
    readHumidity();
    readPressure();
    readDewPoint();
    readUV();

    digitalWrite(DATA_PIN, HIGH);
    // Let's build a stamp for the elastic store
    unixTime = ntpUnixTime(udp);

#ifdef _ELASTIC
    String data = "{ \"deviceid\": \"02608C010203\",";
    data += "\"temperature\": \"" + String(temperature) + "\", ";
    data += "\"humidity\": \"" + String(humidity) + "\", ";
    data += "\"pressure\": \"" + String(pressure) + "\", ";
    data += "\"dewpoint\": \"" + String(dewPoint) + "\", ";
    data += "\"rainfall\": \"" + String(totalRainfall) + "\", ";
    data += "\"windspeed\": \"" + String(windSpeed) + "\", ";
    data += "\"winddirection\" :\"" + String(windDirection) + "\", ";
    data += "\"uv\": \"" + String(uv) + "\", ";
    data += "\"timestamp\" :\"" + String(unixTime) + "\"";
    data += " }";
#endif

#ifdef _SPLUNK
    String data = "{ \"host\": \"02608C010203\",";
    data += "\"sourcetype\": \"arduino\", ";
    data += "\"index\": \"myiot\", ";
    data += "\"event\": { ";
    data += "\"temp\": \"" + String(temperature) + "\", ";
    data += "\"hum\": \"" + String(humidity) + "\", ";
    data += "\"press\": \"" + String(pressure) + "\", ";
    data += "\"dewpt\": \"" + String(dewPoint) + "\", ";
    data += "\"rain\": \"" + String(totalRainfall) + "\", ";
    data += "\"speed\": \"" + String(windSpeed) + "\", ";
    data += "\"dir\": \"" + String(windDirection) + "\", ";
    data += "\"uv\": \"" + String(uv) + "\" ";
    data += " }}";
#endif

#ifdef _DEBUG
    Serial.println("Connecting to: " + String(HOST) + ":" + String(PORT));
#endif    
    if(client.connect(HOST, PORT)) {
#ifdef _DEBUG
      Serial.println("Connected");
      Serial.println(data);
#endif
#ifdef _ELASTIC
      String url = "/test/reading";
#endif
#ifdef _SPLUNK
      String url = "/services/collector";
#endif
      client.println("POST " + url + " HTTP/1.1");
#ifdef _SPLUNK
      client.println("Authorization: Splunk AUTH_KEY");
#endif
      client.print("Content-Length: ");
      client.println(data.length());
      client.println();
      client.println(data);
      client.println();
      
      delay(50);
      
      if(client.connected()) {
        client.flush();
        client.stop();
      }
      digitalWrite(DATA_PIN, LOW);
    }
    else {
#ifdef _DEBUG
    Serial.println("Connect failed");
#endif      
    }
    readingTime = 0;
    // General housekeeping stuff
    totalMinutes++;
    if(totalMinutes > 1440) { // 24 hours
      totalRainfall = 0;
      totalMinutes = 0;
    }
  }
}

Next:  Getting data into Elasticsearch

No comments:

Post a Comment

Wasting your and my time

I had a really interesting experience recently which I hope might enlighten others as much as it did me: I was approached (via LinkedIn) by ...