Recording weather with Arduino, Elasticsearch and Kibana 3
Recording weather with Arduino, Elasticsearch and Kibana
Miss the first part of this series? It's here!Part 3 - Software and Testing
One of the more amusing aspects of this exercise was to code for small platforms.
The main challenge revolves around measuring speed and rainfall, because these instruments simply generate a pulse when something happens, and the code needs to be ready to act, but at the same time, there is a degree of "housekeeping" which needs to be done.
The simplest method of doing this is to generate an interrupt whenever a pulse is received from one of the instruments. This way, the CPU can give you the appearance of doing multiple things (like housekeeping and uploading) at once.
I like to think my coding ability is adequate, but doing this on an Raspberry Pi, CHIP, BeagleBone etc means I have to write my ISR (Interrupt Service Routines) as kernel modules, and that is just more work than I want to commit to for my own amusement. I needed to get back pretty much to the bare metal, which made Arduino a simpler choice.
However, if you have done any coding for Arduino, you soon realise that it is a single-threaded platform. In essence the code you upload plus the runtime libraries are the operating system. Don't worry, this is good news because it means we can largely do what we want with the processor, and the Atmel (now Microchip) CPUs are very flexible with interrupts and the Arduino ecosystem already has good library support.
Stay with me folks, this is going to get interesting...
We have three sources of interrupts in our system:
- Anemometer
- Rainfall
- Internal Timer
The key to how all this operates is the Internal Timer. I have used the "Timer-1" library which can be installed via the Arduino Library Manager.
Timer1 establishes an interrupt which fires every 500mS. It is not perfectly accurate, but since the CPU is not working all that hard, it will be accurate enough for this job. The basic loop structure of the program is as follows:
loop() {
while(timer count < 120) {
}
increment minutes by 2
collect readings
build statistics for transmit
transmit
if minutes >= 1440
reset daily statistics
}
So in essence, the CPU spends most of its time waiting for two minutes to expire.
Sensor Interrupts
When the rainfall or wind speed sensors generate a pulse, an interrupt will be raised in the CPU. The ISR (Interrupt Service Routine) delays 50mS to debounce the input. Unless a cyclone hits Sydney, we won't miss any pulses in this debounce routine.
After the debounce period, either the wind speed is calculated or the rain counter is incremented. No other work is done in this time to make sure that we do as little as possible within the interrupt context.
When the timer count exceeds 1 minute (2 x 0.5s = 120 ticks), the real work begins. All the sensors are read, and then a timestamp needs to be generated.
Timing
The timestamp is necessary because Elasticsearch cannot generate one by itself. I could mess around with log stash and have it do so, but since we have a network connection, why not use NTP to do so?
A further gotcha, is that we need to convert the timestamp to mS for consumption by Elasticsearch. Pretty simple, multiply the value by 1000.
However if you don't want (or can't) use NTP, you could use a small RTC module and ditch all the NTP code.
Sensor Calculations
This is the one place you are on your own. You need four pieces of information to get useful readings from your sensors:
- For the rain fall sensor, how many mm per "tip"
- For wind speed, how many pulses/revolution and the circumference of the sensor
- For wind direction, the values of resistor for each position.
Rainfall is pretty simple. Most gauges will be between 0.1 and 1.0mm per tip event. If you can't find the value, you can always compare it with a rain gauge. Set BUCKET_SIZE to this value.
Wind speed is going to need some math. It's easy enough to use a multimeter or test lamp to measure pulses/rotation. For low-cost senders the answer seems to always be 1.
The math comes in because we need to know how far a single cup travels in a revolution. Measure the distance from the centre of the cup, to the centre of the anemometer axle. This is the radius (r) of our circle. Double that to get the diameter. From high-school you might remember that the circumference of a circle is calculated as:
pi x d
If you measured the diameter in millimetres (mm), then if you divide your result by 1,000 you get the circumference in metres. This is the value to set ROT_VALUE to.
The Wind direction resistance is for reference only at this point. When we start feeding data into Elasticsearch, you will see how we translate it into a direction.
Networking
Much of the code for this project is to support the networking tasks. The Ether10 is compatible with the Arduino Ethernet Shields, so the code should need little, if any, modifications. Most of this code has come from the Arduino examples, and should be easy enough to understand. The biggest catch is that the MAC address for the controller is hard-coded. Later versions of Ether10 than mine have a unique IP address. If you ran up two boards with the same code, you can going to have problems on the local network, so be sure to change the values in the byte mac[] line.
The Code
Please note that this is not always going to be the most recent version of the code. Current releases can be found on bitbucket. The URL is:
There are usually 2 branches of the code on Bitbucket. The first is "master", which is tested and stable. If you use the same basic components as I did, there's no reason it should not work for you.
The other is "develop", which is where all the fun stuff is happening. Any new features get added via "develop", so it might not always be perfect, but it will at least compile and run for me.
/* * Inspired by a lot of different ideas * An Arduino-based weather station. * * Owing to memory limitations, this sketch does NOT use SSL. * Be careful what you send over it. */ /* * PORT ALLOCATIONS * PORT DESIGNATION DIRECTION TYPE DESCRIPTION * D13 LED_PIN OUT DIGITAL On-Board LED. General Indication * D12 DATA_PIN OUT DIGITAL "Network" LED. Activity indication * D3 RAIN_PIN IN DIGITAL Rain Sensor Input - 1 pulse/tip * D2 SPEED_PIN IN DIGITAL Speed Sensor Input - 1 pulse/revolution * A5 SCK OUT DIGITAL I2C SCK (Clock Signal) * A4 SDA IN/OUT DIGITAL I2C SDA (Data I/O) * A1 VANE_PIN IN ANALOGUE Wind Direction * A0 UV_PIN IN ANALOGUE UV Sensor * */ /* * * Note: You need to create the ElasticSearch Index and Mappings beforehand * curl -XPUT 'http://host:9200/test?pretty' -H 'Content-Type: application/json' -d' { "mappings" : { "reading" : { "properties" : { "deviceid" : { "type" : "text" }, "temperature" : { "type" : "float" }, "humidity" : { "type" : "float" }, "pressure" : { "type" : "float" }, "dewpoint" : { "type" : "float" }, "windspeed" : { "type" : "float" }, "winddirection" : { "type" : "float" }, "uv" : { "type" : "float" }, "timestamp" : { "type" : "date" } } } } } ' */ #include <TimerOne.h> #include <Ethernet.h> #include <EthernetUdp.h> #include <Dhcp.h> #include <EthernetClient.h> #include <Dns.h> #include <BME280.h> #include <BME280I2C.h> //#define _DEBUG #define _SPLUNK //#define _ELASTIC #ifdef _ELASTIC #define HOST "your-elastic-host" #define PORT 9200 #endif #ifdef _SPLUNK #define HOST "your-splunk-host" #define PORT 8088 #endif #define BUCKET_SIZE 0.2794 // Rain in mm per tipping event #define ROT_VALUE 0.666667 // Distance of one rotation #define RAIN_PIN 3 #define SPEED_PIN 2 #define UV_PIN (A0) #define VANE_PIN (A1) #define LED_PIN 13 #define DATA_PIN 12 BME280I2C bme; EthernetClient client; EthernetUDP udp; unsigned long unixTime; float temperature; float humidity; float pressure; float windSpeed; float dewPoint; unsigned long tipCount; unsigned long contactTime; unsigned long contactBounceTime; unsigned long totalRainfall; unsigned long rotationCount; unsigned int uv; unsigned int timerCount; unsigned int readingTime; unsigned int totalMinutes; int vaneReading; int windDirection; boolean masterStatus; void readTemperature() { temperature = bme.temp(true); } void readHumidity() { humidity = bme.hum(); } void readPressure() { pressure = bme.pres(1); } void readDewPoint() { dewPoint = bme.dew(bme.temp(), bme.hum(), true); } /* * Vout (mV) UV Index * < 50 0 * 227 1 * 318 2 * 408 3 * 503 4 * 606 5 * 696 6 * 795 7 * 881 8 * 976 9 * 1079 10 * > 1170 11+ */ void readUV() { // uv = analogRead(UV_PIN); uv = map(analogRead(UV_PIN), 0, 1023, 0, 5000); } void readWindDirection() { vaneReading = analogRead(VANE_PIN); windDirection = map(vaneReading, 0, 1023, 0, 5000); // Convert to mV } void isr_rain() { if((millis() - contactTime) > 15) { tipCount++; totalRainfall = tipCount * BUCKET_SIZE; contactTime = millis(); } } void isr_wind() { // Temporary. Show that we are in the ISR digitalWrite(DATA_PIN, HIGH); if((millis() - contactBounceTime) > 15) { rotationCount ++; contactBounceTime = millis(); } digitalWrite(DATA_PIN, LOW); } void isr_timer() { digitalWrite(LED_PIN, HIGH); timerCount++; readingTime++; if(timerCount == 5) { // Need to divide count by 60 to get m/s // Then multiply by 3.6 for Km/h windSpeed = rotationCount / 60 * ROT_VALUE * 3.6; rotationCount = 0; timerCount = 0; } digitalWrite(LED_PIN, LOW); } void blinkLED(unsigned int times) { for(int i = 0; i < times; i++) { digitalWrite(LED_PIN, HIGH); delay(75); digitalWrite(LED_PIN, LOW); delay(75); } } unsigned long inline ntpUnixTime (UDP &udp) { static int udpInited = udp.begin(123); // open socket on arbitrary port const char timeServer[] = "pool.ntp.org"; // NTP server // Only the first four bytes of an outgoing NTP packet need to be set // appropriately, the rest can be whatever. const long ntpFirstFourBytes = 0xEC0600E3; // NTP request header // Fail if WiFiUdp.begin() could not init a socket if (! udpInited) return 0; // Clear received data from possible stray received packets udp.flush(); // Send an NTP request digitalWrite(DATA_PIN, HIGH); if (! (udp.beginPacket(timeServer, 123) // 123 is the NTP port && udp.write((byte *)&ntpFirstFourBytes, 48) == 48 && udp.endPacket())) return 0; // sending request failed #ifdef _DEBUG Serial.println("Waiting for ntp"); #endif // Wait for response; check every pollIntv ms up to maxPoll times const int pollIntv = 150; // poll every this many ms const byte maxPoll = 15; // poll up to this many times int pktLen; // received packet length for (byte i=0; i<maxPoll; i++) { if ((pktLen = udp.parsePacket()) == 48) break; delay(pollIntv); } if (pktLen != 48) return 0; // no correct packet received // Read and discard the first useless bytes // Set useless to 32 for speed; set to 40 for accuracy. const byte useless = 40; for (byte i = 0; i < useless; ++i) udp.read(); // Read the integer part of sending time unsigned long time = udp.read(); // NTP time for (byte i = 1; i < 4; i++) time = time << 8 | udp.read(); // Round to the nearest second if we want accuracy // The fractionary part is the next byte divided by 256: if it is // greater than 500ms we round to the next second; we also account // for an assumed network delay of 50ms, and (0.5-0.05)*256=115; // additionally, we account for how much we delayed reading the packet // since its arrival, which we assume on average to be pollIntv/2. time += (udp.read() > 115 - pollIntv/8); // Discard the rest of the packet udp.flush(); digitalWrite(DATA_PIN, LOW); return time - 2208988800ul; // convert NTP time to Unix time } void setup() { pinMode(LED_PIN, OUTPUT); pinMode(DATA_PIN, OUTPUT); pinMode(RAIN_PIN, INPUT); pinMode(SPEED_PIN, INPUT); digitalWrite(DATA_PIN, HIGH); // If using the Freetronics EtherTen board, // the W5100 takes a bit longer to come out of reset delay(100); // This should be plenty #ifdef _DEBUG Serial.begin(9600); while(!Serial) { blinkLED(1); delay(10); // Wait for serial port } #endif while(bme.begin() == false) { #ifdef _DEBUG Serial.println("Waiting for BME280"); #endif blinkLED(2); delay(1000); } #ifdef _DEBUG Serial.println("BME280 is initialised!"); #endif byte mac[] = { 0x02, 0x60, 0x8C, 0x01, 0x02, 0x03 }; while(Ethernet.begin(mac) == 0) { #ifdef _DEBUG Serial.println("Waiting for DHCP"); #endif blinkLED(3); delay(1000); } #ifdef _DEBUG Serial.println("Network is active"); #endif // Set all the globals to a valid starting point. temperature = 0.0; humidity = 0.0; pressure = 0.0; uv = 0; totalRainfall = 0; tipCount = 0; contactTime = 0; contactBounceTime = 0; rotationCount = 0; timerCount = 0; readingTime = 0; totalMinutes = 0; // Use this to determine 24 hours // Initialise the interrupt for the rain gauge attachInterrupt(digitalPinToInterrupt(RAIN_PIN), isr_rain, FALLING); #ifdef _DEBUG Serial.println("Attached Rain Sensor"); #endif // Initialise the interrupt for wind speed attachInterrupt(digitalPinToInterrupt(SPEED_PIN), isr_wind, RISING); #ifdef _DEBUG Serial.println("Attached Speed Sensor"); #endif // Set up a timer for 1/2 second intervals Timer1.initialize(500000); Timer1.attachInterrupt(isr_timer); #ifdef _DEBUG Serial.println("Attached Timer"); #endif sei(); // Enable interrupts digitalWrite(DATA_PIN, LOW); } void loop() { if(readingTime > 119) { // 1 minute has expired blinkLED(4); readTemperature(); readHumidity(); readPressure(); readDewPoint(); readUV(); digitalWrite(DATA_PIN, HIGH); // Let's build a stamp for the elastic store unixTime = ntpUnixTime(udp); #ifdef _ELASTIC String data = "{ \"deviceid\": \"02608C010203\","; data += "\"temperature\": \"" + String(temperature) + "\", "; data += "\"humidity\": \"" + String(humidity) + "\", "; data += "\"pressure\": \"" + String(pressure) + "\", "; data += "\"dewpoint\": \"" + String(dewPoint) + "\", "; data += "\"rainfall\": \"" + String(totalRainfall) + "\", "; data += "\"windspeed\": \"" + String(windSpeed) + "\", "; data += "\"winddirection\" :\"" + String(windDirection) + "\", "; data += "\"uv\": \"" + String(uv) + "\", "; data += "\"timestamp\" :\"" + String(unixTime) + "\""; data += " }"; #endif #ifdef _SPLUNK String data = "{ \"host\": \"02608C010203\","; data += "\"sourcetype\": \"arduino\", "; data += "\"index\": \"myiot\", "; data += "\"event\": { "; data += "\"temp\": \"" + String(temperature) + "\", "; data += "\"hum\": \"" + String(humidity) + "\", "; data += "\"press\": \"" + String(pressure) + "\", "; data += "\"dewpt\": \"" + String(dewPoint) + "\", "; data += "\"rain\": \"" + String(totalRainfall) + "\", "; data += "\"speed\": \"" + String(windSpeed) + "\", "; data += "\"dir\": \"" + String(windDirection) + "\", "; data += "\"uv\": \"" + String(uv) + "\" "; data += " }}"; #endif #ifdef _DEBUG Serial.println("Connecting to: " + String(HOST) + ":" + String(PORT)); #endif if(client.connect(HOST, PORT)) { #ifdef _DEBUG Serial.println("Connected"); Serial.println(data); #endif #ifdef _ELASTIC String url = "/test/reading"; #endif #ifdef _SPLUNK String url = "/services/collector"; #endif client.println("POST " + url + " HTTP/1.1"); #ifdef _SPLUNK client.println("Authorization: Splunk AUTH_KEY"); #endif client.print("Content-Length: "); client.println(data.length()); client.println(); client.println(data); client.println(); delay(50); if(client.connected()) { client.flush(); client.stop(); } digitalWrite(DATA_PIN, LOW); } else { #ifdef _DEBUG Serial.println("Connect failed"); #endif } readingTime = 0; // General housekeeping stuff totalMinutes++; if(totalMinutes > 1440) { // 24 hours totalRainfall = 0; totalMinutes = 0; } } }
Next: Getting data into Elasticsearch
Comments
Post a Comment