Introduction and System Overview
Air pollution monitoring has become increasingly critical in urban environments where industrial activities, vehicle emissions, and other sources contribute to deteriorating air quality. Traditional monitoring systems often rely on expensive, centralized equipment that provides limited spatial coverage. This article presents a comprehensive solution for building a distributed Internet of Things (IoT) network that enables cost-effective, wide-area air pollution monitoring using multiple remote sensor devices.
The proposed system architecture consists of three primary components working in harmony. First, numerous ESP32-based sensor nodes equipped with air quality sensors continuously collect environmental data from their respective locations. Second, a central web server receives, processes, and stores data from all registered devices while maintaining their geographical information. Third, an intelligent data processing system that utilizes artificial intelligence algorithms to analyze collected data and provide meaningful insights to end users.
The fundamental operation principle involves each sensor device measuring air pollution parameters at regular intervals and transmitting this data along with its unique identifier to the central server. The server then processes incoming data by grouping measurements from devices within configurable geographical regions and computing average pollution levels for those areas. Users can query the system to obtain real-time air quality information for any specified location and radius.
This distributed approach offers several advantages over traditional monitoring systems. The network provides higher spatial resolution by deploying multiple low-cost sensors across a wide area. The system demonstrates excellent scalability, allowing new devices to be added without significant infrastructure changes. Additionally, the redundancy inherent in multiple sensors improves data reliability and system robustness.
Hardware Architecture and Component Selection
ESP32 Microcontroller Platform
The ESP32 serves as the foundation for each sensor node due to its integrated WiFi capabilities, sufficient processing power, and extensive peripheral support. The ESP32 features a dual-core Tensilica Xtensa LX6 microprocessor running at up to 240 MHz, providing adequate computational resources for sensor data processing and network communication. The integrated 802.11 b/g/n WiFi transceiver eliminates the need for additional networking hardware, reducing both cost and complexity.
The ESP32 development board includes essential components such as voltage regulators, crystal oscillators, and USB-to-serial converters that simplify the development process. The board provides numerous GPIO pins for sensor interfacing, including analog-to-digital converters, I2C, SPI, and UART communication interfaces. The built-in flash memory stores the firmware program, while the RAM provides sufficient space for data buffering and network operations.
Air Quality Sensor Selection
For air pollution measurement, the MQ-135 gas sensor provides a cost-effective solution for detecting various air pollutants including ammonia, nitrogen oxides, benzene, smoke, and carbon dioxide. This semiconductor-based sensor operates by measuring changes in electrical conductivity when exposed to target gases. The sensor output varies proportionally to the concentration of detected pollutants, providing an analog voltage signal that the ESP32 can easily process.
The MQ-135 sensor requires a heating element to maintain optimal operating temperature, which consumes approximately 150 milliwatts of power. The sensor includes both analog and digital outputs, though the analog output provides more precise measurements suitable for quantitative analysis. The response time typically ranges from 10 to 60 seconds, making it suitable for continuous monitoring applications.
Circuit Design and Connections
The hardware circuit connects the MQ-135 sensor to the ESP32 through a simple interface that requires minimal external components. The sensor's VCC pin connects to the ESP32's 3.3V power supply, while the ground pin connects to the common ground. The analog output pin connects to one of the ESP32's ADC-capable GPIO pins, specifically GPIO34 in this implementation.
A pull-up resistor of 10 kilohms connects between the sensor's analog output and the power supply to ensure stable readings. Additionally, a bypass capacitor of 100 microfarads connects between power and ground near the sensor to filter power supply noise. These components ensure reliable sensor operation and accurate measurements.
The complete circuit requires minimal external components, making it suitable for compact, low-cost deployment. The ESP32's built-in voltage regulation eliminates the need for additional power conditioning circuits when powered from a 5V USB source or appropriate wall adapter.
ESP32 Air Quality Sensor Circuit Diagram:
+3.3V
|
+-+
| | 10kΩ
| |
+-+
|
ESP32 | MQ-135
+-------+ | +-------+
| | +-------------|VCC |
|GPIO34 |<---------------------- |A0 |
| | | |
|GND |------------------------|GND |
| | | |
|3.3V |------------------------|VCC |
+-------+ +-------+
|
-----
----- 100µF
|
GND
ESP32 Firmware Implementation
Core System Architecture
The ESP32 firmware follows a modular architecture that separates concerns into distinct functional units. The main program loop coordinates between sensor reading, data processing, network communication, and power management functions. This separation ensures maintainable code and facilitates future enhancements or modifications.
The firmware implements a state machine approach where the device cycles through different operational states including initialization, sensor reading, data transmission, and sleep modes. This approach optimizes power consumption while ensuring reliable data collection and transmission.
WiFi Network Management
The WiFi connection management system handles network connectivity with automatic reconnection capabilities. The system stores network credentials in non-volatile memory and attempts to connect to the configured network during startup. If the connection fails, the system implements an exponential backoff strategy to avoid overwhelming the network infrastructure.
#include <WiFi.h>
#include <HTTPClient.h>
#include <ArduinoJson.h>
#include <EEPROM.h>
#include <esp_system.h>
// Network configuration constants
const char* WIFI_SSID = "YourNetworkName";
const char* WIFI_PASSWORD = "YourNetworkPassword";
const char* SERVER_URL = "http://your-server.com/api/sensor-data";
// Device configuration
const int SENSOR_PIN = 34;
const int MEASUREMENT_INTERVAL = 600000; // 10 minutes in milliseconds
const int MAX_RETRY_ATTEMPTS = 3;
const int WIFI_TIMEOUT = 10000; // 10 seconds
// Global variables for device operation
String deviceUUID;
unsigned long lastMeasurementTime = 0;
bool wifiConnected = false;
class WiFiManager {
private:
int connectionAttempts;
unsigned long lastConnectionAttempt;
public:
WiFiManager() : connectionAttempts(0), lastConnectionAttempt(0) {}
bool initializeConnection() {
WiFi.mode(WIFI_STA);
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
Serial.println("Connecting to WiFi network...");
unsigned long startTime = millis();
while (WiFi.status() != WL_CONNECTED &&
(millis() - startTime) < WIFI_TIMEOUT) {
delay(500);
Serial.print(".");
}
if (WiFi.status() == WL_CONNECTED) {
Serial.println("\nWiFi connected successfully");
Serial.print("IP address: ");
Serial.println(WiFi.localIP());
connectionAttempts = 0;
return true;
} else {
Serial.println("\nWiFi connection failed");
connectionAttempts++;
return false;
}
}
bool isConnected() {
return WiFi.status() == WL_CONNECTED;
}
bool reconnectIfNeeded() {
if (!isConnected()) {
Serial.println("WiFi disconnected, attempting reconnection...");
return initializeConnection();
}
return true;
}
void handleConnectionFailure() {
if (connectionAttempts >= MAX_RETRY_ATTEMPTS) {
Serial.println("Maximum connection attempts reached, entering deep sleep");
esp_deep_sleep(MEASUREMENT_INTERVAL * 1000);
}
}
};
The WiFiManager class encapsulates all network-related functionality, providing clean interfaces for connection management. The class implements retry logic with configurable timeouts to handle temporary network issues gracefully. When maximum retry attempts are reached, the device enters deep sleep mode to conserve power and avoid continuous failed connection attempts.
Sensor Data Acquisition System
The sensor data acquisition system implements calibrated readings from the MQ-135 air quality sensor. The system performs multiple readings and applies statistical filtering to reduce noise and improve measurement accuracy. The implementation includes temperature compensation and baseline correction to account for environmental variations.
class AirQualitySensor {
private:
int sensorPin;
float baselineResistance;
int warmupTime;
bool isWarmedUp;
// Calibration constants for MQ-135 sensor
static constexpr float RLOAD = 10.0; // Load resistance in kΩ
static constexpr float RZERO = 76.63; // Sensor resistance in clean air
static constexpr float PARA = 116.6020682; // Calibration parameter
static constexpr float PARB = 2.769034857; // Calibration parameter
public:
AirQualitySensor(int pin) : sensorPin(pin), isWarmedUp(false), warmupTime(20000) {
pinMode(sensorPin, INPUT);
baselineResistance = 0.0;
}
void initialize() {
Serial.println("Initializing air quality sensor...");
// Allow sensor to warm up
unsigned long startTime = millis();
while ((millis() - startTime) < warmupTime) {
delay(1000);
Serial.print("Warming up sensor... ");
Serial.print((millis() - startTime) / 1000);
Serial.println(" seconds");
}
// Calculate baseline resistance in clean air
baselineResistance = calculateResistance();
isWarmedUp = true;
Serial.print("Sensor initialization complete. Baseline resistance: ");
Serial.print(baselineResistance);
Serial.println(" kΩ");
}
float readRawValue() {
if (!isWarmedUp) {
Serial.println("Warning: Sensor not properly warmed up");
return -1.0;
}
// Take multiple readings for averaging
const int numReadings = 10;
float totalValue = 0.0;
for (int i = 0; i < numReadings; i++) {
int analogValue = analogRead(sensorPin);
float voltage = (analogValue / 4095.0) * 3.3; // Convert to voltage
float resistance = calculateResistanceFromVoltage(voltage);
totalValue += resistance;
delay(100); // Small delay between readings
}
return totalValue / numReadings;
}
float calculatePPM() {
float resistance = readRawValue();
if (resistance < 0) {
return -1.0; // Error condition
}
// Calculate ratio of current resistance to baseline
float ratio = resistance / RZERO;
// Convert resistance ratio to PPM using calibration curve
float ppm = PARA * pow(ratio, -PARB);
return ppm;
}
private:
float calculateResistance() {
int analogValue = analogRead(sensorPin);
float voltage = (analogValue / 4095.0) * 3.3;
return calculateResistanceFromVoltage(voltage);
}
float calculateResistanceFromVoltage(float voltage) {
if (voltage <= 0.0) {
return 0.0;
}
// Calculate sensor resistance using voltage divider formula
float resistance = ((3.3 - voltage) / voltage) * RLOAD;
return resistance;
}
};
The AirQualitySensor class implements comprehensive sensor management including initialization, calibration, and measurement functions. The class performs statistical averaging across multiple readings to reduce measurement noise and improve data quality. The implementation includes proper sensor warm-up procedures and resistance-to-PPM conversion using calibrated parameters specific to the MQ-135 sensor.
Data Transmission and Communication Protocol
The data transmission system implements a robust HTTP-based communication protocol for sending sensor measurements to the central server. The system formats data in JSON format and includes error handling for network failures and server responses.
class DataTransmissionManager {
private:
String serverURL;
String deviceID;
HTTPClient httpClient;
public:
DataTransmissionManager(const String& url, const String& id)
: serverURL(url), deviceID(id) {}
bool transmitSensorData(float ppmValue, float latitude, float longitude) {
if (ppmValue < 0) {
Serial.println("Invalid sensor reading, skipping transmission");
return false;
}
// Create JSON payload
DynamicJsonDocument jsonDoc(1024);
jsonDoc["device_id"] = deviceID;
jsonDoc["timestamp"] = getCurrentTimestamp();
jsonDoc["air_quality_ppm"] = ppmValue;
jsonDoc["latitude"] = latitude;
jsonDoc["longitude"] = longitude;
jsonDoc["sensor_type"] = "MQ135";
jsonDoc["firmware_version"] = "1.0.0";
String jsonString;
serializeJson(jsonDoc, jsonString);
Serial.println("Transmitting sensor data:");
Serial.println(jsonString);
// Configure HTTP client
httpClient.begin(serverURL);
httpClient.addHeader("Content-Type", "application/json");
httpClient.addHeader("User-Agent", "ESP32-AirQuality-Sensor/1.0");
httpClient.setTimeout(15000); // 15 second timeout
// Send POST request
int httpResponseCode = httpClient.POST(jsonString);
if (httpResponseCode > 0) {
String response = httpClient.getString();
Serial.print("HTTP Response Code: ");
Serial.println(httpResponseCode);
Serial.print("Server Response: ");
Serial.println(response);
httpClient.end();
return (httpResponseCode >= 200 && httpResponseCode < 300);
} else {
Serial.print("HTTP Error: ");
Serial.println(httpClient.errorToString(httpResponseCode));
httpClient.end();
return false;
}
}
private:
unsigned long getCurrentTimestamp() {
// In a production system, this would sync with NTP server
// For simplicity, using millis() as relative timestamp
return millis();
}
};
Device Registration and UUID Management
The device registration system ensures each sensor node has a unique identifier and properly registers with the central server during initial setup. The system stores the UUID in non-volatile memory to maintain consistency across power cycles.
class DeviceManager {
private:
String uuid;
bool isRegistered;
public:
DeviceManager() : isRegistered(false) {}
void initialize() {
// Try to load existing UUID from EEPROM
EEPROM.begin(512);
uuid = loadUUIDFromEEPROM();
if (uuid.length() == 0) {
// Generate new UUID if none exists
uuid = generateUUID();
saveUUIDToEEPROM(uuid);
Serial.println("Generated new device UUID: " + uuid);
} else {
Serial.println("Loaded existing UUID: " + uuid);
}
}
String getDeviceUUID() {
return uuid;
}
bool registerWithServer(float latitude, float longitude) {
HTTPClient httpClient;
String registrationURL = "http://your-server.com/api/register-device";
// Create registration payload
DynamicJsonDocument jsonDoc(512);
jsonDoc["device_id"] = uuid;
jsonDoc["device_type"] = "air_quality_sensor";
jsonDoc["latitude"] = latitude;
jsonDoc["longitude"] = longitude;
jsonDoc["sensor_model"] = "MQ135";
jsonDoc["firmware_version"] = "1.0.0";
String jsonString;
serializeJson(jsonDoc, jsonString);
httpClient.begin(registrationURL);
httpClient.addHeader("Content-Type", "application/json");
int responseCode = httpClient.POST(jsonString);
if (responseCode >= 200 && responseCode < 300) {
Serial.println("Device registered successfully with server");
isRegistered = true;
httpClient.end();
return true;
} else {
Serial.print("Device registration failed. Response code: ");
Serial.println(responseCode);
httpClient.end();
return false;
}
}
private:
String generateUUID() {
// Simple UUID generation based on ESP32 chip ID and random values
uint64_t chipId = ESP.getEfuseMac();
String uuid = String((uint32_t)(chipId >> 32), HEX) +
String((uint32_t)chipId, HEX) +
String(random(0x10000000, 0xFFFFFFFF), HEX);
uuid.toUpperCase();
return uuid;
}
void saveUUIDToEEPROM(const String& uuid) {
for (int i = 0; i < uuid.length() && i < 32; i++) {
EEPROM.write(i, uuid[i]);
}
EEPROM.write(uuid.length(), '\0');
EEPROM.commit();
}
String loadUUIDFromEEPROM() {
String uuid = "";
char ch;
for (int i = 0; i < 32; i++) {
ch = EEPROM.read(i);
if (ch == '\0') break;
uuid += ch;
}
return uuid;
}
};
Main Program Structure and Execution Flow
The main program coordinates all system components and implements the primary execution loop. The program handles initialization, periodic sensor readings, data transmission, and power management.
// Global object instances
WiFiManager wifiManager;
AirQualitySensor airSensor(SENSOR_PIN);
DataTransmissionManager dataManager(SERVER_URL, "");
DeviceManager deviceManager;
void setup() {
Serial.begin(115200);
Serial.println("ESP32 Air Quality Sensor Starting...");
// Initialize random number generator
randomSeed(analogRead(0));
// Initialize device manager and get UUID
deviceManager.initialize();
String deviceUUID = deviceManager.getDeviceUUID();
// Update data manager with device UUID
dataManager = DataTransmissionManager(SERVER_URL, deviceUUID);
// Initialize WiFi connection
if (!wifiManager.initializeConnection()) {
Serial.println("Failed to connect to WiFi, entering deep sleep");
esp_deep_sleep(MEASUREMENT_INTERVAL * 1000);
}
// Register device with server (with hardcoded coordinates for this example)
// In production, GPS module would provide actual coordinates
float deviceLatitude = 40.7128; // Example: New York City
float deviceLongitude = -74.0060;
if (!deviceManager.registerWithServer(deviceLatitude, deviceLongitude)) {
Serial.println("Device registration failed, continuing with measurements");
}
// Initialize air quality sensor
airSensor.initialize();
Serial.println("System initialization complete");
lastMeasurementTime = millis();
}
void loop() {
unsigned long currentTime = millis();
// Check if it's time for next measurement
if (currentTime - lastMeasurementTime >= MEASUREMENT_INTERVAL) {
// Ensure WiFi connection is active
if (!wifiManager.reconnectIfNeeded()) {
wifiManager.handleConnectionFailure();
return;
}
// Read sensor data
Serial.println("Taking air quality measurement...");
float ppmValue = airSensor.calculatePPM();
if (ppmValue >= 0) {
Serial.print("Air quality reading: ");
Serial.print(ppmValue);
Serial.println(" PPM");
// Transmit data to server
float deviceLatitude = 40.7128; // In production, get from GPS
float deviceLongitude = -74.0060;
bool transmissionSuccess = dataManager.transmitSensorData(
ppmValue, deviceLatitude, deviceLongitude);
if (transmissionSuccess) {
Serial.println("Data transmitted successfully");
} else {
Serial.println("Data transmission failed");
}
} else {
Serial.println("Invalid sensor reading, skipping transmission");
}
lastMeasurementTime = currentTime;
}
// Small delay to prevent excessive CPU usage
delay(1000);
}
Central Web Server Implementation
Server Architecture and Technology Stack
The central web server implements a scalable architecture capable of handling multiple concurrent sensor devices while providing real-time data access to end users. The server utilizes Node.js with Express framework for HTTP request handling, MongoDB for data persistence, and implements RESTful API endpoints for device communication and user queries.
The server architecture follows a modular design pattern with separate modules for device management, data processing, geographical calculations, and artificial intelligence-based data analysis. This separation ensures maintainable code and facilitates future enhancements or scaling requirements.
Database Schema and Data Models
The database schema accommodates device registration information, sensor measurements, and user access patterns. The design optimizes for both write-heavy operations from sensor devices and read-heavy operations from user queries.
Javascript
// server.js - Main server application
const express = require('express');
const mongoose = require('mongoose');
const cors = require('cors');
const helmet = require('helmet');
const rateLimit = require('express-rate-limit');
const compression = require('compression');
// Import custom modules
const deviceRoutes = require('./routes/devices');
const dataRoutes = require('./routes/data');
const analyticsRoutes = require('./routes/analytics');
const { connectDatabase } = require('./config/database');
const { logger } = require('./utils/logger');
// Express application setup
const app = express();
const PORT = process.env.PORT || 3000;
// Security middleware
app.use(helmet());
app.use(cors({
origin: process.env.ALLOWED_ORIGINS?.split(',') || ['http://localhost:3000'],
credentials: true
}));
// Rate limiting configuration
const limiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100, // Limit each IP to 100 requests per windowMs
message: 'Too many requests from this IP, please try again later.'
});
app.use('/api/', limiter);
// Compression and parsing middleware
app.use(compression());
app.use(express.json({ limit: '10mb' }));
app.use(express.urlencoded({ extended: true }));
// Request logging middleware
app.use((req, res, next) => {
logger.info(`${req.method} ${req.path} - ${req.ip}`);
next();
});
// API routes
app.use('/api/devices', deviceRoutes);
app.use('/api/data', dataRoutes);
app.use('/api/analytics', analyticsRoutes);
// Health check endpoint
app.get('/health', (req, res) => {
res.status(200).json({
status: 'healthy',
timestamp: new Date().toISOString(),
uptime: process.uptime()
});
});
// Error handling middleware
app.use((error, req, res, next) => {
logger.error('Unhandled error:', error);
res.status(500).json({
error: 'Internal server error',
message: process.env.NODE_ENV === 'development' ? error.message : 'Something went wrong'
});
});
// 404 handler
app.use('*', (req, res) => {
res.status(404).json({
error: 'Not found',
message: 'The requested resource was not found'
});
});
// Database connection and server startup
async function startServer() {
try {
await connectDatabase();
logger.info('Database connected successfully');
app.listen(PORT, () => {
logger.info(`Air Quality Monitoring Server running on port ${PORT}`);
logger.info(`Environment: ${process.env.NODE_ENV || 'development'}`);
});
} catch (error) {
logger.error('Failed to start server:', error);
process.exit(1);
}
}
// Graceful shutdown handling
process.on('SIGTERM', () => {
logger.info('SIGTERM received, shutting down gracefully');
process.exit(0);
});
process.on('SIGINT', () => {
logger.info('SIGINT received, shutting down gracefully');
process.exit(0);
});
startServer();
Database Models and Schema Definitions
The database models define the structure for storing device information, sensor measurements, and analytical results. The schema includes proper indexing for efficient geographical queries and time-based data retrieval.
Javascript
// models/Device.js - Device registration model
const mongoose = require('mongoose');
const deviceSchema = new mongoose.Schema({
deviceId: {
type: String,
required: true,
unique: true,
index: true,
trim: true,
maxlength: 64
},
deviceType: {
type: String,
required: true,
enum: ['air_quality_sensor', 'weather_station', 'noise_monitor'],
default: 'air_quality_sensor'
},
location: {
type: {
type: String,
enum: ['Point'],
required: true,
default: 'Point'
},
coordinates: {
type: [Number],
required: true,
validate: {
validator: function(coords) {
return coords.length === 2 &&
coords[0] >= -180 && coords[0] <= 180 && // longitude
coords[1] >= -90 && coords[1] <= 90; // latitude
},
message: 'Invalid coordinates format'
}
}
},
sensorModel: {
type: String,
required: true,
trim: true,
maxlength: 50
},
firmwareVersion: {
type: String,
required: true,
trim: true,
maxlength: 20
},
registrationDate: {
type: Date,
default: Date.now,
index: true
},
lastActivity: {
type: Date,
default: Date.now,
index: true
},
isActive: {
type: Boolean,
default: true,
index: true
},
calibrationData: {
baselineResistance: Number,
calibrationDate: Date,
calibrationCoefficients: [Number]
}
}, {
timestamps: true
});
// Create geospatial index for location-based queries
deviceSchema.index({ location: '2dsphere' });
// Index for efficient device lookup
deviceSchema.index({ deviceId: 1, isActive: 1 });
// Update last activity timestamp on save
deviceSchema.pre('save', function(next) {
this.lastActivity = new Date();
next();
});
// Instance methods
deviceSchema.methods.updateActivity = function() {
this.lastActivity = new Date();
return this.save();
};
deviceSchema.methods.deactivate = function() {
this.isActive = false;
return this.save();
};
// Static methods
deviceSchema.statics.findActiveDevices = function() {
return this.find({ isActive: true });
};
deviceSchema.statics.findDevicesInRadius = function(longitude, latitude, radiusMeters) {
return this.find({
location: {
$geoWithin: {
$centerSphere: [[longitude, latitude], radiusMeters / 6378100]
}
},
isActive: true
});
};
module.exports = mongoose.model('Device', deviceSchema);
Javascript
// models/SensorData.js - Sensor measurement model
const mongoose = require('mongoose');
const sensorDataSchema = new mongoose.Schema({
deviceId: {
type: String,
required: true,
index: true,
ref: 'Device'
},
timestamp: {
type: Date,
required: true,
index: true,
default: Date.now
},
location: {
type: {
type: String,
enum: ['Point'],
required: true,
default: 'Point'
},
coordinates: {
type: [Number],
required: true
}
},
measurements: {
airQualityPPM: {
type: Number,
required: true,
min: 0,
max: 10000,
validate: {
validator: function(value) {
return !isNaN(value) && isFinite(value);
},
message: 'Air quality PPM must be a valid number'
}
},
temperature: {
type: Number,
min: -50,
max: 100
},
humidity: {
type: Number,
min: 0,
max: 100
},
pressure: {
type: Number,
min: 800,
max: 1200
}
},
sensorType: {
type: String,
required: true,
enum: ['MQ135', 'MQ7', 'BME280', 'DHT22'],
default: 'MQ135'
},
firmwareVersion: {
type: String,
required: true
},
dataQuality: {
type: String,
enum: ['excellent', 'good', 'fair', 'poor'],
default: 'good'
},
processed: {
type: Boolean,
default: false,
index: true
}
}, {
timestamps: true
});
// Compound indexes for efficient queries
sensorDataSchema.index({ deviceId: 1, timestamp: -1 });
sensorDataSchema.index({ timestamp: -1, processed: 1 });
sensorDataSchema.index({ location: '2dsphere', timestamp: -1 });
// Time-based partitioning index
sensorDataSchema.index({
timestamp: -1
}, {
expireAfterSeconds: 60 * 60 * 24 * 365 // 1 year retention
});
// Static methods for data analysis
sensorDataSchema.statics.getAverageInRadius = async function(longitude, latitude, radiusMeters, startTime, endTime) {
const pipeline = [
{
$match: {
location: {
$geoWithin: {
$centerSphere: [[longitude, latitude], radiusMeters / 6378100]
}
},
timestamp: {
$gte: startTime,
$lte: endTime
}
}
},
{
$group: {
_id: null,
averageAirQuality: { $avg: '$measurements.airQualityPPM' },
averageTemperature: { $avg: '$measurements.temperature' },
averageHumidity: { $avg: '$measurements.humidity' },
sampleCount: { $sum: 1 },
deviceCount: { $addToSet: '$deviceId' }
}
},
{
$project: {
_id: 0,
averageAirQuality: { $round: ['$averageAirQuality', 2] },
averageTemperature: { $round: ['$averageTemperature', 2] },
averageHumidity: { $round: ['$averageHumidity', 2] },
sampleCount: 1,
deviceCount: { $size: '$deviceCount' }
}
}
];
const result = await this.aggregate(pipeline);
return result[0] || null;
};
sensorDataSchema.statics.getTimeSeriesData = function(deviceId, startTime, endTime, interval = '1h') {
const groupBy = {
'1m': { $dateToString: { format: '%Y-%m-%d %H:%M', date: '$timestamp' } },
'1h': { $dateToString: { format: '%Y-%m-%d %H:00', date: '$timestamp' } },
'1d': { $dateToString: { format: '%Y-%m-%d', date: '$timestamp' } }
};
return this.aggregate([
{
$match: {
deviceId: deviceId,
timestamp: { $gte: startTime, $lte: endTime }
}
},
{
$group: {
_id: groupBy[interval] || groupBy['1h'],
averageAirQuality: { $avg: '$measurements.airQualityPPM' },
minAirQuality: { $min: '$measurements.airQualityPPM' },
maxAirQuality: { $max: '$measurements.airQualityPPM' },
sampleCount: { $sum: 1 }
}
},
{
$sort: { _id: 1 }
}
]);
};
module.exports = mongoose.model('SensorData', sensorDataSchema);
Device Registration and Management API
The device registration system handles new device onboarding, location updates, and device status management. The API validates device information and maintains an active device registry.
Javascript
// routes/devices.js - Device management routes
const express = require('express');
const Device = require('../models/Device');
const { logger } = require('../utils/logger');
const { validateDeviceRegistration } = require('../middleware/validation');
const router = express.Router();
// Register new device
router.post('/register', validateDeviceRegistration, async (req, res) => {
try {
const {
device_id,
device_type,
latitude,
longitude,
sensor_model,
firmware_version
} = req.body;
// Check if device already exists
const existingDevice = await Device.findOne({ deviceId: device_id });
if (existingDevice) {
// Update existing device information
existingDevice.location.coordinates = [longitude, latitude];
existingDevice.sensorModel = sensor_model;
existingDevice.firmwareVersion = firmware_version;
existingDevice.isActive = true;
await existingDevice.save();
logger.info(`Device ${device_id} updated registration`);
return res.status(200).json({
success: true,
message: 'Device registration updated successfully',
deviceId: device_id
});
}
// Create new device registration
const newDevice = new Device({
deviceId: device_id,
deviceType: device_type || 'air_quality_sensor',
location: {
type: 'Point',
coordinates: [longitude, latitude]
},
sensorModel: sensor_model,
firmwareVersion: firmware_version,
registrationDate: new Date(),
isActive: true
});
await newDevice.save();
logger.info(`New device registered: ${device_id}`);
res.status(201).json({
success: true,
message: 'Device registered successfully',
deviceId: device_id
});
} catch (error) {
logger.error('Device registration error:', error);
res.status(500).json({
success: false,
error: 'Device registration failed',
message: error.message
});
}
});
// Get device information
router.get('/:deviceId', async (req, res) => {
try {
const { deviceId } = req.params;
const device = await Device.findOne({ deviceId: deviceId });
if (!device) {
return res.status(404).json({
success: false,
error: 'Device not found'
});
}
res.json({
success: true,
device: {
deviceId: device.deviceId,
deviceType: device.deviceType,
location: device.location,
sensorModel: device.sensorModel,
firmwareVersion: device.firmwareVersion,
registrationDate: device.registrationDate,
lastActivity: device.lastActivity,
isActive: device.isActive
}
});
} catch (error) {
logger.error('Get device error:', error);
res.status(500).json({
success: false,
error: 'Failed to retrieve device information'
});
}
});
// List all active devices
router.get('/', async (req, res) => {
try {
const { page = 1, limit = 50, type } = req.query;
const query = { isActive: true };
if (type) {
query.deviceType = type;
}
const devices = await Device.find(query)
.select('deviceId deviceType location sensorModel lastActivity')
.limit(limit * 1)
.skip((page - 1) * limit)
.sort({ lastActivity: -1 });
const totalDevices = await Device.countDocuments(query);
res.json({
success: true,
devices: devices,
pagination: {
currentPage: parseInt(page),
totalPages: Math.ceil(totalDevices / limit),
totalDevices: totalDevices,
hasNextPage: page * limit < totalDevices,
hasPrevPage: page > 1
}
});
} catch (error) {
logger.error('List devices error:', error);
res.status(500).json({
success: false,
error: 'Failed to retrieve device list'
});
}
});
// Update device location
router.put('/:deviceId/location', async (req, res) => {
try {
const { deviceId } = req.params;
const { latitude, longitude } = req.body;
if (!latitude || !longitude) {
return res.status(400).json({
success: false,
error: 'Latitude and longitude are required'
});
}
const device = await Device.findOne({ deviceId: deviceId });
if (!device) {
return res.status(404).json({
success: false,
error: 'Device not found'
});
}
device.location.coordinates = [longitude, latitude];
await device.save();
logger.info(`Device ${deviceId} location updated`);
res.json({
success: true,
message: 'Device location updated successfully'
});
} catch (error) {
logger.error('Update device location error:', error);
res.status(500).json({
success: false,
error: 'Failed to update device location'
});
}
});
// Deactivate device
router.delete('/:deviceId', async (req, res) => {
try {
const { deviceId } = req.params;
const device = await Device.findOne({ deviceId: deviceId });
if (!device) {
return res.status(404).json({
success: false,
error: 'Device not found'
});
}
await device.deactivate();
logger.info(`Device ${deviceId} deactivated`);
res.json({
success: true,
message: 'Device deactivated successfully'
});
} catch (error) {
logger.error('Deactivate device error:', error);
res.status(500).json({
success: false,
error: 'Failed to deactivate device'
});
}
});
module.exports = router;
Sensor Data Ingestion API
The data ingestion system receives sensor measurements from ESP32 devices, validates the data, and stores it in the database with proper indexing for efficient retrieval.
Javascript
// routes/data.js - Sensor data ingestion and retrieval routes
const express = require('express');
const SensorData = require('../models/SensorData');
const Device = require('../models/Device');
const { logger } = require('../utils/logger');
const { validateSensorData } = require('../middleware/validation');
const { calculateAirQualityIndex } = require('../utils/airQuality');
const router = express.Router();
// Receive sensor data from devices
router.post('/sensor-data', validateSensorData, async (req, res) => {
try {
const {
device_id,
timestamp,
air_quality_ppm,
latitude,
longitude,
sensor_type,
firmware_version,
temperature,
humidity
} = req.body;
// Verify device exists and is active
const device = await Device.findOne({
deviceId: device_id,
isActive: true
});
if (!device) {
return res.status(404).json({
success: false,
error: 'Device not found or inactive'
});
}
// Update device last activity
await device.updateActivity();
// Create sensor data record
const sensorData = new SensorData({
deviceId: device_id,
timestamp: timestamp ? new Date(timestamp) : new Date(),
location: {
type: 'Point',
coordinates: [longitude, latitude]
},
measurements: {
airQualityPPM: air_quality_ppm,
temperature: temperature,
humidity: humidity
},
sensorType: sensor_type || 'MQ135',
firmwareVersion: firmware_version,
dataQuality: calculateDataQuality(air_quality_ppm, temperature, humidity)
});
await sensorData.save();
logger.info(`Sensor data received from device ${device_id}: ${air_quality_ppm} PPM`);
res.status(201).json({
success: true,
message: 'Sensor data received successfully',
dataId: sensorData._id,
airQualityIndex: calculateAirQualityIndex(air_quality_ppm)
});
} catch (error) {
logger.error('Sensor data ingestion error:', error);
res.status(500).json({
success: false,
error: 'Failed to process sensor data',
message: error.message
});
}
});
// Get aggregated data for a geographical area
router.get('/area-average', async (req, res) => {
try {
const {
latitude,
longitude,
radius = 1000, // Default 1km radius
start_time,
end_time,
interval = '1h'
} = req.query;
if (!latitude || !longitude) {
return res.status(400).json({
success: false,
error: 'Latitude and longitude are required'
});
}
const lat = parseFloat(latitude);
const lng = parseFloat(longitude);
const radiusMeters = parseInt(radius);
// Default time range: last 24 hours
const endTime = end_time ? new Date(end_time) : new Date();
const startTime = start_time ? new Date(start_time) :
new Date(endTime.getTime() - 24 * 60 * 60 * 1000);
// Get average data for the area
const averageData = await SensorData.getAverageInRadius(
lng, lat, radiusMeters, startTime, endTime
);
if (!averageData) {
return res.json({
success: true,
message: 'No data available for the specified area and time range',
data: null
});
}
// Calculate air quality index and health recommendations
const airQualityIndex = calculateAirQualityIndex(averageData.averageAirQuality);
const healthRecommendation = getHealthRecommendation(airQualityIndex);
res.json({
success: true,
data: {
location: {
latitude: lat,
longitude: lng,
radius: radiusMeters
},
timeRange: {
startTime: startTime,
endTime: endTime
},
measurements: {
averageAirQuality: averageData.averageAirQuality,
averageTemperature: averageData.averageTemperature,
averageHumidity: averageData.averageHumidity,
airQualityIndex: airQualityIndex,
healthRecommendation: healthRecommendation
},
statistics: {
sampleCount: averageData.sampleCount,
deviceCount: averageData.deviceCount
}
}
});
} catch (error) {
logger.error('Area average calculation error:', error);
res.status(500).json({
success: false,
error: 'Failed to calculate area average'
});
}
});
// Get time series data for a specific device
router.get('/device/:deviceId/timeseries', async (req, res) => {
try {
const { deviceId } = req.params;
const {
start_time,
end_time,
interval = '1h'
} = req.query;
// Verify device exists
const device = await Device.findOne({ deviceId: deviceId });
if (!device) {
return res.status(404).json({
success: false,
error: 'Device not found'
});
}
// Default time range: last 7 days
const endTime = end_time ? new Date(end_time) : new Date();
const startTime = start_time ? new Date(start_time) :
new Date(endTime.getTime() - 7 * 24 * 60 * 60 * 1000);
const timeSeriesData = await SensorData.getTimeSeriesData(
deviceId, startTime, endTime, interval
);
res.json({
success: true,
deviceId: deviceId,
timeRange: {
startTime: startTime,
endTime: endTime,
interval: interval
},
data: timeSeriesData
});
} catch (error) {
logger.error('Time series data error:', error);
res.status(500).json({
success: false,
error: 'Failed to retrieve time series data'
});
}
});
// Helper functions
function calculateDataQuality(airQuality, temperature, humidity) {
let qualityScore = 100;
// Penalize extreme or unrealistic values
if (airQuality < 0 || airQuality > 1000) qualityScore -= 50;
if (temperature && (temperature < -40 || temperature > 60)) qualityScore -= 30;
if (humidity && (humidity < 0 || humidity > 100)) qualityScore -= 30;
if (qualityScore >= 90) return 'excellent';
if (qualityScore >= 70) return 'good';
if (qualityScore >= 50) return 'fair';
return 'poor';
}
function getHealthRecommendation(airQualityIndex) {
if (airQualityIndex <= 50) {
return 'Air quality is good. Ideal for outdoor activities.';
} else if (airQualityIndex <= 100) {
return 'Air quality is moderate. Sensitive individuals should consider limiting outdoor activities.';
} else if (airQualityIndex <= 150) {
return 'Air quality is unhealthy for sensitive groups. Limit outdoor activities if you are sensitive to air pollution.';
} else if (airQualityIndex <= 200) {
return 'Air quality is unhealthy. Everyone should limit outdoor activities.';
} else {
return 'Air quality is very unhealthy. Avoid outdoor activities.';
}
}
module.exports = router;
Artificial Intelligence Data Analysis System
The AI analysis system processes collected sensor data to provide intelligent insights, trend analysis, and predictive capabilities. The system implements machine learning algorithms for pattern recognition and anomaly detection.
Javascript
// routes/analytics.js - AI-powered data analysis routes
const express = require('express');
const SensorData = require('../models/SensorData');
const Device = require('../models/Device');
const { logger } = require('../utils/logger');
const { performAIAnalysis } = require('../services/aiAnalysis');
const router = express.Router();
// AI-powered area analysis
router.get('/ai-analysis', async (req, res) => {
try {
const {
latitude,
longitude,
radius = 2000,
analysis_type = 'comprehensive'
} = req.query;
if (!latitude || !longitude) {
return res.status(400).json({
success: false,
error: 'Latitude and longitude are required'
});
}
const lat = parseFloat(latitude);
const lng = parseFloat(longitude);
const radiusMeters = parseInt(radius);
// Get recent data for AI analysis
const endTime = new Date();
const startTime = new Date(endTime.getTime() - 7 * 24 * 60 * 60 * 1000); // Last 7 days
// Retrieve raw sensor data for analysis
const sensorData = await SensorData.find({
location: {
$geoWithin: {
$centerSphere: [[lng, lat], radiusMeters / 6378100]
}
},
timestamp: {
$gte: startTime,
$lte: endTime
}
}).sort({ timestamp: -1 }).limit(1000);
if (sensorData.length === 0) {
return res.json({
success: true,
message: 'Insufficient data for AI analysis',
analysis: null
});
}
// Perform AI analysis
const aiAnalysis = await performAIAnalysis(sensorData, {
analysisType: analysis_type,
location: { latitude: lat, longitude: lng },
radius: radiusMeters
});
res.json({
success: true,
analysis: aiAnalysis,
dataPoints: sensorData.length,
analysisTimestamp: new Date()
});
} catch (error) {
logger.error('AI analysis error:', error);
res.status(500).json({
success: false,
error: 'Failed to perform AI analysis'
});
}
});
// Trend analysis for specific location
router.get('/trends', async (req, res) => {
try {
const {
latitude,
longitude,
radius = 1000,
period = '30d'
} = req.query;
if (!latitude || !longitude) {
return res.status(400).json({
success: false,
error: 'Latitude and longitude are required'
});
}
const lat = parseFloat(latitude);
const lng = parseFloat(longitude);
const radiusMeters = parseInt(radius);
// Calculate time range based on period
const endTime = new Date();
let startTime;
switch (period) {
case '7d':
startTime = new Date(endTime.getTime() - 7 * 24 * 60 * 60 * 1000);
break;
case '30d':
startTime = new Date(endTime.getTime() - 30 * 24 * 60 * 60 * 1000);
break;
case '90d':
startTime = new Date(endTime.getTime() - 90 * 24 * 60 * 60 * 1000);
break;
default:
startTime = new Date(endTime.getTime() - 30 * 24 * 60 * 60 * 1000);
}
// Aggregate data by day for trend analysis
const trendData = await SensorData.aggregate([
{
$match: {
location: {
$geoWithin: {
$centerSphere: [[lng, lat], radiusMeters / 6378100]
}
},
timestamp: {
$gte: startTime,
$lte: endTime
}
}
},
{
$group: {
_id: {
$dateToString: { format: '%Y-%m-%d', date: '$timestamp' }
},
averageAirQuality: { $avg: '$measurements.airQualityPPM' },
minAirQuality: { $min: '$measurements.airQualityPPM' },
maxAirQuality: { $max: '$measurements.airQualityPPM' },
sampleCount: { $sum: 1 }
}
},
{
$sort: { _id: 1 }
}
]);
// Calculate trend statistics
const trendAnalysis = calculateTrendStatistics(trendData);
res.json({
success: true,
location: {
latitude: lat,
longitude: lng,
radius: radiusMeters
},
period: period,
trendData: trendData,
trendAnalysis: trendAnalysis
});
} catch (error) {
logger.error('Trend analysis error:', error);
res.status(500).json({
success: false,
error: 'Failed to perform trend analysis'
});
}
});
// Anomaly detection
router.get('/anomalies', async (req, res) => {
try {
const {
latitude,
longitude,
radius = 1000,
sensitivity = 'medium'
} = req.query;
if (!latitude || !longitude) {
return res.status(400).json({
success: false,
error: 'Latitude and longitude are required'
});
}
const lat = parseFloat(latitude);
const lng = parseFloat(longitude);
const radiusMeters = parseInt(radius);
// Get recent data for anomaly detection
const endTime = new Date();
const startTime = new Date(endTime.getTime() - 24 * 60 * 60 * 1000); // Last 24 hours
const recentData = await SensorData.find({
location: {
$geoWithin: {
$centerSphere: [[lng, lat], radiusMeters / 6378100]
}
},
timestamp: {
$gte: startTime,
$lte: endTime
}
}).sort({ timestamp: -1 });
// Detect anomalies using statistical methods
const anomalies = detectAnomalies(recentData, sensitivity);
res.json({
success: true,
location: {
latitude: lat,
longitude: lng,
radius: radiusMeters
},
timeRange: {
startTime: startTime,
endTime: endTime
},
anomalies: anomalies,
sensitivity: sensitivity
});
} catch (error) {
logger.error('Anomaly detection error:', error);
res.status(500).json({
success: false,
error: 'Failed to detect anomalies'
});
}
});
// Helper functions for analysis
function calculateTrendStatistics(trendData) {
if (trendData.length < 2) {
return {
trend: 'insufficient_data',
changeRate: 0,
volatility: 0
};
}
const values = trendData.map(d => d.averageAirQuality);
const firstValue = values[0];
const lastValue = values[values.length - 1];
// Calculate overall trend
const changeRate = ((lastValue - firstValue) / firstValue) * 100;
// Calculate volatility (standard deviation)
const mean = values.reduce((sum, val) => sum + val, 0) / values.length;
const variance = values.reduce((sum, val) => sum + Math.pow(val - mean, 2), 0) / values.length;
const volatility = Math.sqrt(variance);
let trend;
if (Math.abs(changeRate) < 5) {
trend = 'stable';
} else if (changeRate > 0) {
trend = 'increasing';
} else {
trend = 'decreasing';
}
return {
trend: trend,
changeRate: Math.round(changeRate * 100) / 100,
volatility: Math.round(volatility * 100) / 100,
mean: Math.round(mean * 100) / 100
};
}
function detectAnomalies(data, sensitivity) {
if (data.length < 10) {
return [];
}
const values = data.map(d => d.measurements.airQualityPPM);
const mean = values.reduce((sum, val) => sum + val, 0) / values.length;
const variance = values.reduce((sum, val) => sum + Math.pow(val - mean, 2), 0) / values.length;
const stdDev = Math.sqrt(variance);
// Set threshold based on sensitivity
let threshold;
switch (sensitivity) {
case 'low':
threshold = 3;
break;
case 'medium':
threshold = 2.5;
break;
case 'high':
threshold = 2;
break;
default:
threshold = 2.5;
}
const anomalies = [];
data.forEach((point, index) => {
const value = point.measurements.airQualityPPM;
const zScore = Math.abs((value - mean) / stdDev);
if (zScore > threshold) {
anomalies.push({
timestamp: point.timestamp,
deviceId: point.deviceId,
value: value,
zScore: Math.round(zScore * 100) / 100,
severity: zScore > 3 ? 'high' : 'medium',
location: point.location
});
}
});
return anomalies.sort((a, b) => new Date(b.timestamp) - new Date(a.timestamp));
}
module.exports = router;
AI Analysis Service Implementation
The AI analysis service implements machine learning algorithms for pattern recognition, prediction, and intelligent data interpretation.
Javascript
// services/aiAnalysis.js - AI analysis implementation
const tf = require('@tensorflow/tfjs-node');
const { logger } = require('../utils/logger');
class AIAnalysisService {
constructor() {
this.model = null;
this.isModelLoaded = false;
}
async initializeModel() {
try {
// In a production environment, load a pre-trained model
// For this example, we'll create a simple neural network
this.model = tf.sequential({
layers: [
tf.layers.dense({ inputShape: [4], units: 16, activation: 'relu' }),
tf.layers.dense({ units: 8, activation: 'relu' }),
tf.layers.dense({ units: 1, activation: 'linear' })
]
});
this.model.compile({
optimizer: 'adam',
loss: 'meanSquaredError',
metrics: ['mae']
});
this.isModelLoaded = true;
logger.info('AI model initialized successfully');
} catch (error) {
logger.error('Failed to initialize AI model:', error);
}
}
async performAIAnalysis(sensorData, options = {}) {
try {
if (!this.isModelLoaded) {
await this.initializeModel();
}
const analysis = {
summary: await this.generateSummary(sensorData),
patterns: await this.detectPatterns(sensorData),
predictions: await this.generatePredictions(sensorData),
recommendations: await this.generateRecommendations(sensorData),
riskAssessment: await this.assessRisk(sensorData)
};
return analysis;
} catch (error) {
logger.error('AI analysis failed:', error);
throw error;
}
}
async generateSummary(sensorData) {
const values = sensorData.map(d => d.measurements.airQualityPPM);
const timestamps = sensorData.map(d => new Date(d.timestamp));
const summary = {
totalDataPoints: values.length,
timeSpan: {
start: Math.min(...timestamps),
end: Math.max(...timestamps)
},
statistics: {
mean: this.calculateMean(values),
median: this.calculateMedian(values),
standardDeviation: this.calculateStandardDeviation(values),
minimum: Math.min(...values),
maximum: Math.max(...values)
},
airQualityLevel: this.categorizeAirQuality(this.calculateMean(values))
};
return summary;
}
async detectPatterns(sensorData) {
const patterns = {
dailyPattern: await this.analyzeDailyPattern(sensorData),
weeklyPattern: await this.analyzeWeeklyPattern(sensorData),
seasonalTrend: await this.analyzeSeasonalTrend(sensorData),
correlations: await this.analyzeCorrelations(sensorData)
};
return patterns;
}
async analyzeDailyPattern(sensorData) {
const hourlyData = {};
sensorData.forEach(point => {
const hour = new Date(point.timestamp).getHours();
if (!hourlyData[hour]) {
hourlyData[hour] = [];
}
hourlyData[hour].push(point.measurements.airQualityPPM);
});
const hourlyAverages = {};
for (let hour = 0; hour < 24; hour++) {
if (hourlyData[hour] && hourlyData[hour].length > 0) {
hourlyAverages[hour] = this.calculateMean(hourlyData[hour]);
} else {
hourlyAverages[hour] = null;
}
}
// Identify peak pollution hours
const validHours = Object.entries(hourlyAverages)
.filter(([hour, avg]) => avg !== null)
.map(([hour, avg]) => ({ hour: parseInt(hour), average: avg }));
const peakHour = validHours.reduce((max, current) =>
current.average > max.average ? current : max, validHours[0]);
const lowHour = validHours.reduce((min, current) =>
current.average < min.average ? current : min, validHours[0]);
return {
hourlyAverages: hourlyAverages,
peakPollutionHour: peakHour,
lowestPollutionHour: lowHour,
dailyVariation: peakHour.average - lowHour.average
};
}
async analyzeWeeklyPattern(sensorData) {
const weeklyData = {};
const dayNames = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday'];
sensorData.forEach(point => {
const dayOfWeek = new Date(point.timestamp).getDay();
if (!weeklyData[dayOfWeek]) {
weeklyData[dayOfWeek] = [];
}
weeklyData[dayOfWeek].push(point.measurements.airQualityPPM);
});
const weeklyAverages = {};
for (let day = 0; day < 7; day++) {
if (weeklyData[day] && weeklyData[day].length > 0) {
weeklyAverages[dayNames[day]] = this.calculateMean(weeklyData[day]);
}
}
return {
weeklyAverages: weeklyAverages,
weekendVsWeekday: this.compareWeekendVsWeekday(weeklyData)
};
}
async analyzeSeasonalTrend(sensorData) {
// Group data by month
const monthlyData = {};
sensorData.forEach(point => {
const month = new Date(point.timestamp).getMonth();
if (!monthlyData[month]) {
monthlyData[month] = [];
}
monthlyData[month].push(point.measurements.airQualityPPM);
});
const monthlyAverages = {};
const monthNames = [
'January', 'February', 'March', 'April', 'May', 'June',
'July', 'August', 'September', 'October', 'November', 'December'
];
for (let month = 0; month < 12; month++) {
if (monthlyData[month] && monthlyData[month].length > 0) {
monthlyAverages[monthNames[month]] = this.calculateMean(monthlyData[month]);
}
}
return {
monthlyAverages: monthlyAverages,
seasonalVariation: this.calculateSeasonalVariation(monthlyAverages)
};
}
async analyzeCorrelations(sensorData) {
// Analyze correlations between air quality and other factors
const correlations = {};
if (sensorData.some(d => d.measurements.temperature !== undefined)) {
correlations.temperatureCorrelation = this.calculateCorrelation(
sensorData.map(d => d.measurements.airQualityPPM),
sensorData.map(d => d.measurements.temperature).filter(t => t !== undefined)
);
}
if (sensorData.some(d => d.measurements.humidity !== undefined)) {
correlations.humidityCorrelation = this.calculateCorrelation(
sensorData.map(d => d.measurements.airQualityPPM),
sensorData.map(d => d.measurements.humidity).filter(h => h !== undefined)
);
}
return correlations;
}
async generatePredictions(sensorData) {
if (sensorData.length < 10) {
return {
error: 'Insufficient data for predictions',
predictions: null
};
}
// Simple time series prediction using moving average
const values = sensorData.map(d => d.measurements.airQualityPPM);
const recentValues = values.slice(-10); // Last 10 values
const movingAverage = this.calculateMean(recentValues);
// Calculate trend
const firstHalf = recentValues.slice(0, 5);
const secondHalf = recentValues.slice(5);
const trend = this.calculateMean(secondHalf) - this.calculateMean(firstHalf);
const predictions = [];
for (let i = 1; i <= 6; i++) { // Predict next 6 hours
const predictedValue = movingAverage + (trend * i);
predictions.push({
hoursAhead: i,
predictedValue: Math.max(0, predictedValue),
confidence: Math.max(0.1, 1 - (i * 0.15)) // Decreasing confidence
});
}
return {
method: 'moving_average_with_trend',
predictions: predictions,
baseValue: movingAverage,
trend: trend
};
}
async generateRecommendations(sensorData) {
const currentLevel = this.calculateMean(
sensorData.slice(-5).map(d => d.measurements.airQualityPPM)
);
const recommendations = [];
if (currentLevel > 150) {
recommendations.push({
priority: 'high',
category: 'health',
message: 'Air quality is unhealthy. Avoid outdoor activities and keep windows closed.'
});
recommendations.push({
priority: 'high',
category: 'action',
message: 'Consider using air purifiers indoors and wearing masks when going outside.'
});
} else if (currentLevel > 100) {
recommendations.push({
priority: 'medium',
category: 'health',
message: 'Air quality is moderate. Sensitive individuals should limit outdoor activities.'
});
} else {
recommendations.push({
priority: 'low',
category: 'health',
message: 'Air quality is good. Safe for all outdoor activities.'
});
}
// Add monitoring recommendations
if (sensorData.length < 50) {
recommendations.push({
priority: 'medium',
category: 'monitoring',
message: 'More data collection recommended for better analysis accuracy.'
});
}
return recommendations;
}
async assessRisk(sensorData) {
const values = sensorData.map(d => d.measurements.airQualityPPM);
const currentLevel = this.calculateMean(values.slice(-5));
const maxLevel = Math.max(...values);
const volatility = this.calculateStandardDeviation(values);
let riskLevel;
let riskScore = 0;
// Base risk on current level
if (currentLevel > 200) riskScore += 40;
else if (currentLevel > 150) riskScore += 30;
else if (currentLevel > 100) riskScore += 20;
else if (currentLevel > 50) riskScore += 10;
// Add risk based on maximum observed level
if (maxLevel > 300) riskScore += 30;
else if (maxLevel > 200) riskScore += 20;
else if (maxLevel > 150) riskScore += 10;
// Add risk based on volatility
if (volatility > 50) riskScore += 20;
else if (volatility > 30) riskScore += 10;
// Determine risk level
if (riskScore >= 70) riskLevel = 'very_high';
else if (riskScore >= 50) riskLevel = 'high';
else if (riskScore >= 30) riskLevel = 'medium';
else if (riskScore >= 15) riskLevel = 'low';
else riskLevel = 'very_low';
return {
riskLevel: riskLevel,
riskScore: riskScore,
factors: {
currentLevel: currentLevel,
maxLevel: maxLevel,
volatility: volatility
}
};
}
// Utility methods
calculateMean(values) {
return values.reduce((sum, val) => sum + val, 0) / values.length;
}
calculateMedian(values) {
const sorted = values.slice().sort((a, b) => a - b);
const middle = Math.floor(sorted.length / 2);
if (sorted.length % 2 === 0) {
return (sorted[middle - 1] + sorted[middle]) / 2;
} else {
return sorted[middle];
}
}
calculateStandardDeviation(values) {
const mean = this.calculateMean(values);
const variance = values.reduce((sum, val) => sum + Math.pow(val - mean, 2), 0) / values.length;
return Math.sqrt(variance);
}
calculateCorrelation(x, y) {
if (x.length !== y.length || x.length === 0) return 0;
const meanX = this.calculateMean(x);
const meanY = this.calculateMean(y);
let numerator = 0;
let denomX = 0;
let denomY = 0;
for (let i = 0; i < x.length; i++) {
const deltaX = x[i] - meanX;
const deltaY = y[i] - meanY;
numerator += deltaX * deltaY;
denomX += deltaX * deltaX;
denomY += deltaY * deltaY;
}
const denominator = Math.sqrt(denomX * denomY);
return denominator === 0 ? 0 : numerator / denominator;
}
categorizeAirQuality(ppm) {
if (ppm <= 50) return 'Good';
if (ppm <= 100) return 'Moderate';
if (ppm <= 150) return 'Unhealthy for Sensitive Groups';
if (ppm <= 200) return 'Unhealthy';
if (ppm <= 300) return 'Very Unhealthy';
return 'Hazardous';
}
compareWeekendVsWeekday(weeklyData) {
const weekdayData = [];
const weekendData = [];
for (let day = 0; day < 7; day++) {
if (weeklyData[day]) {
if (day === 0 || day === 6) { // Sunday or Saturday
weekendData.push(...weeklyData[day]);
} else {
weekdayData.push(...weeklyData[day]);
}
}
}
return {
weekdayAverage: weekdayData.length > 0 ? this.calculateMean(weekdayData) : null,
weekendAverage: weekendData.length > 0 ? this.calculateMean(weekendData) : null,
difference: weekdayData.length > 0 && weekendData.length > 0 ?
this.calculateMean(weekdayData) - this.calculateMean(weekendData) : null
};
}
calculateSeasonalVariation(monthlyAverages) {
const values = Object.values(monthlyAverages).filter(v => v !== undefined);
if (values.length === 0) return null;
return {
highestMonth: Object.entries(monthlyAverages).reduce((max, [month, avg]) =>
avg > max.average ? { month, average: avg } : max,
{ month: '', average: -Infinity }),
lowestMonth: Object.entries(monthlyAverages).reduce((min, [month, avg]) =>
avg < min.average ? { month, average: avg } : min,
{ month: '', average: Infinity }),
seasonalRange: Math.max(...values) - Math.min(...values)
};
}
}
// Export singleton instance
const aiAnalysisService = new AIAnalysisService();
async function performAIAnalysis(sensorData, options) {
return await aiAnalysisService.performAIAnalysis(sensorData, options);
}
module.exports = {
performAIAnalysis,
AIAnalysisService
};
System Configuration and Deployment
Database Configuration
The database configuration establishes connection parameters, indexing strategies, and performance optimization settings for MongoDB.
Javascript
// config/database.js - Database configuration
const mongoose = require('mongoose');
const { logger } = require('../utils/logger');
const connectDatabase = async () => {
try {
const mongoURI = process.env.MONGODB_URI || 'mongodb://localhost:27017/air_quality_monitoring';
const options = {
useNewUrlParser: true,
useUnifiedTopology: true,
maxPoolSize: 10,
serverSelectionTimeoutMS: 5000,
socketTimeoutMS: 45000,
family: 4,
bufferCommands: false,
bufferMaxEntries: 0
};
await mongoose.connect(mongoURI, options);
// Set up connection event handlers
mongoose.connection.on('connected', () => {
logger.info('MongoDB connected successfully');
});
mongoose.connection.on('error', (error) => {
logger.error('MongoDB connection error:', error);
});
mongoose.connection.on('disconnected', () => {
logger.warn('MongoDB disconnected');
});
// Graceful shutdown
process.on('SIGINT', async () => {
await mongoose.connection.close();
logger.info('MongoDB connection closed through app termination');
process.exit(0);
});
} catch (error) {
logger.error('Database connection failed:', error);
throw error;
}
};
module.exports = { connectDatabase };
Validation Middleware
The validation middleware ensures data integrity and security by validating incoming requests from sensor devices and user queries.
Javascript
// middleware/validation.js - Request validation middleware
const { body, query, validationResult } = require('express-validator');
const { logger } = require('../utils/logger');
// Device registration validation
const validateDeviceRegistration = [
body('device_id')
.isLength({ min: 8, max: 64 })
.matches(/^[A-Za-z0-9]+$/)
.withMessage('Device ID must be 8-64 alphanumeric characters'),
body('device_type')
.optional()
.isIn(['air_quality_sensor', 'weather_station', 'noise_monitor'])
.withMessage('Invalid device type'),
body('latitude')
.isFloat({ min: -90, max: 90 })
.withMessage('Latitude must be between -90 and 90'),
body('longitude')
.isFloat({ min: -180, max: 180 })
.withMessage('Longitude must be between -180 and 180'),
body('sensor_model')
.isLength({ min: 2, max: 50 })
.withMessage('Sensor model must be 2-50 characters'),
body('firmware_version')
.matches(/^\d+\.\d+\.\d+$/)
.withMessage('Firmware version must be in format x.y.z'),
(req, res, next) => {
const errors = validationResult(req);
if (!errors.isEmpty()) {
logger.warn('Device registration validation failed:', errors.array());
return res.status(400).json({
success: false,
error: 'Validation failed',
details: errors.array()
});
}
next();
}
];
// Sensor data validation
const validateSensorData = [
body('device_id')
.isLength({ min: 8, max: 64 })
.matches(/^[A-Za-z0-9]+$/)
.withMessage('Invalid device ID'),
body('air_quality_ppm')
.isFloat({ min: 0, max: 10000 })
.withMessage('Air quality PPM must be between 0 and 10000'),
body('latitude')
.isFloat({ min: -90, max: 90 })
.withMessage('Invalid latitude'),
body('longitude')
.isFloat({ min: -180, max: 180 })
.withMessage('Invalid longitude'),
body('timestamp')
.optional()
.isISO8601()
.withMessage('Invalid timestamp format'),
body('temperature')
.optional()
.isFloat({ min: -50, max: 100 })
.withMessage('Temperature must be between -50 and 100 Celsius'),
body('humidity')
.optional()
.isFloat({ min: 0, max: 100 })
.withMessage('Humidity must be between 0 and 100 percent'),
body('sensor_type')
.optional()
.isIn(['MQ135', 'MQ7', 'BME280', 'DHT22'])
.withMessage('Invalid sensor type'),
(req, res, next) => {
const errors = validationResult(req);
if (!errors.isEmpty()) {
logger.warn('Sensor data validation failed:', errors.array());
return res.status(400).json({
success: false,
error: 'Invalid sensor data',
details: errors.array()
});
}
next();
}
];
module.exports = {
validateDeviceRegistration,
validateSensorData
};
Utility Functions
The utility functions provide common functionality for air quality calculations, logging, and system operations.
Javascript
// utils/airQuality.js - Air quality calculation utilities
function calculateAirQualityIndex(ppm) {
// Convert PPM to AQI using EPA standards (simplified)
if (ppm <= 12) return Math.round((50 / 12) * ppm);
if (ppm <= 35.4) return Math.round(((100 - 51) / (35.4 - 12.1)) * (ppm - 12.1) + 51);
if (ppm <= 55.4) return Math.round(((150 - 101) / (55.4 - 35.5)) * (ppm - 35.5) + 101);
if (ppm <= 150.4) return Math.round(((200 - 151) / (150.4 - 55.5)) * (ppm - 55.5) + 151);
if (ppm <= 250.4) return Math.round(((300 - 201) / (250.4 - 150.5)) * (ppm - 150.5) + 201);
return Math.round(((500 - 301) / (500.4 - 250.5)) * (ppm - 250.5) + 301);
}
function getAirQualityCategory(aqi) {
if (aqi <= 50) return 'Good';
if (aqi <= 100) return 'Moderate';
if (aqi <= 150) return 'Unhealthy for Sensitive Groups';
if (aqi <= 200) return 'Unhealthy';
if (aqi <= 300) return 'Very Unhealthy';
return 'Hazardous';
}
function getHealthRecommendation(aqi) {
if (aqi <= 50) {
return 'Air quality is satisfactory, and air pollution poses little or no risk.';
} else if (aqi <= 100) {
return 'Air quality is acceptable. However, there may be a risk for some people, particularly those who are unusually sensitive to air pollution.';
} else if (aqi <= 150) {
return 'Members of sensitive groups may experience health effects. The general public is less likely to be affected.';
} else if (aqi <= 200) {
return 'Some members of the general public may experience health effects; members of sensitive groups may experience more serious health effects.';
} else if (aqi <= 300) {
return 'Health alert: The risk of health effects is increased for everyone.';
} else {
return 'Health warning of emergency conditions: everyone is more likely to be affected.';
}
}
module.exports = {
calculateAirQualityIndex,
getAirQualityCategory,
getHealthRecommendation
};
Javascript
// utils/logger.js - Logging utility
const winston = require('winston');
const logger = winston.createLogger({
level: process.env.LOG_LEVEL || 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.json()
),
defaultMeta: { service: 'air-quality-server' },
transports: [
new winston.transports.File({
filename: 'logs/error.log',
level: 'error',
maxsize: 5242880, // 5MB
maxFiles: 5
}),
new winston.transports.File({
filename: 'logs/combined.log',
maxsize: 5242880, // 5MB
maxFiles: 5
})
]
});
if (process.env.NODE_ENV !== 'production') {
logger.add(new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple()
)
}));
}
module.exports = { logger };
Complete Working Example
This section provides a complete, deployable example of the IoT air pollution monitoring system including all necessary configuration files and deployment scripts.
ESP32 Complete Firmware (C++)
// complete_esp32_firmware.ino - Complete ESP32 air quality sensor firmware
#include <WiFi.h>
#include <HTTPClient.h>
#include <ArduinoJson.h>
#include <EEPROM.h>
#include <esp_system.h>
// Configuration constants - Update these for your deployment
const char* WIFI_SSID = "YourWiFiNetwork";
const char* WIFI_PASSWORD = "YourWiFiPassword";
const char* SERVER_URL = "http://your-server.com/api/data/sensor-data";
const char* REGISTRATION_URL = "http://your-server.com/api/devices/register";
// Hardware configuration
const int SENSOR_PIN = 34;
const int LED_PIN = 2;
const int MEASUREMENT_INTERVAL = 600000; // 10 minutes
const int MAX_RETRY_ATTEMPTS = 3;
const int WIFI_TIMEOUT = 15000;
const int SENSOR_WARMUP_TIME = 30000;
// Device location (in production, use GPS module)
const float DEVICE_LATITUDE = 40.7128; // New York City example
const float DEVICE_LONGITUDE = -74.0060;
// Global variables
String deviceUUID;
unsigned long lastMeasurementTime = 0;
bool systemInitialized = false;
// Sensor calibration constants
const float RLOAD = 10.0;
const float RZERO = 76.63;
const float PARA = 116.6020682;
const float PARB = 2.769034857;
class SystemManager {
private:
bool wifiConnected;
int connectionAttempts;
public:
SystemManager() : wifiConnected(false), connectionAttempts(0) {}
void initialize() {
Serial.begin(115200);
Serial.println("\n=== ESP32 Air Quality Sensor System ===");
Serial.println("Firmware Version: 1.0.0");
Serial.println("Initializing system components...");
// Initialize hardware
pinMode(SENSOR_PIN, INPUT);
pinMode(LED_PIN, OUTPUT);
digitalWrite(LED_PIN, LOW);
// Initialize EEPROM
EEPROM.begin(512);
// Generate or load device UUID
deviceUUID = getOrCreateDeviceUUID();
Serial.println("Device UUID: " + deviceUUID);
// Initialize WiFi
if (initializeWiFi()) {
Serial.println("WiFi initialization successful");
registerDevice();
warmupSensor();
systemInitialized = true;
digitalWrite(LED_PIN, HIGH); // Indicate ready state
} else {
Serial.println("WiFi initialization failed - entering deep sleep");
enterDeepSleep();
}
}
bool initializeWiFi() {
WiFi.mode(WIFI_STA);
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
Serial.print("Connecting to WiFi");
unsigned long startTime = millis();
while (WiFi.status() != WL_CONNECTED &&
(millis() - startTime) < WIFI_TIMEOUT) {
delay(500);
Serial.print(".");
}
if (WiFi.status() == WL_CONNECTED) {
Serial.println("\nWiFi connected successfully");
Serial.print("IP Address: ");
Serial.println(WiFi.localIP());
Serial.print("Signal Strength: ");
Serial.print(WiFi.RSSI());
Serial.println(" dBm");
wifiConnected = true;
connectionAttempts = 0;
return true;
} else {
Serial.println("\nWiFi connection failed");
connectionAttempts++;
wifiConnected = false;
return false;
}
}
bool ensureWiFiConnection() {
if (WiFi.status() != WL_CONNECTED) {
Serial.println("WiFi disconnected, attempting reconnection...");
return initializeWiFi();
}
return true;
}
void registerDevice() {
Serial.println("Registering device with server...");
HTTPClient http;
http.begin(REGISTRATION_URL);
http.addHeader("Content-Type", "application/json");
http.setTimeout(15000);
DynamicJsonDocument doc(512);
doc["device_id"] = deviceUUID;
doc["device_type"] = "air_quality_sensor";
doc["latitude"] = DEVICE_LATITUDE;
doc["longitude"] = DEVICE_LONGITUDE;
doc["sensor_model"] = "MQ135";
doc["firmware_version"] = "1.0.0";
String jsonString;
serializeJson(doc, jsonString);
int httpResponseCode = http.POST(jsonString);
if (httpResponseCode >= 200 && httpResponseCode < 300) {
Serial.println("Device registration successful");
} else {
Serial.print("Device registration failed. HTTP Code: ");
Serial.println(httpResponseCode);
Serial.println("Continuing with measurements...");
}
http.end();
}
void warmupSensor() {
Serial.println("Warming up air quality sensor...");
unsigned long startTime = millis();
while ((millis() - startTime) < SENSOR_WARMUP_TIME) {
unsigned long elapsed = (millis() - startTime) / 1000;
Serial.print("Sensor warmup: ");
Serial.print(elapsed);
Serial.print("/");
Serial.print(SENSOR_WARMUP_TIME / 1000);
Serial.println(" seconds");
// Blink LED during warmup
digitalWrite(LED_PIN, !digitalRead(LED_PIN));
delay(1000);
}
digitalWrite(LED_PIN, HIGH);
Serial.println("Sensor warmup complete");
}
void enterDeepSleep() {
Serial.println("Entering deep sleep mode...");
digitalWrite(LED_PIN, LOW);
esp_deep_sleep(MEASUREMENT_INTERVAL * 1000ULL);
}
bool isSystemReady() {
return systemInitialized && wifiConnected;
}
private:
String getOrCreateDeviceUUID() {
String uuid = loadUUIDFromEEPROM();
if (uuid.length() == 0) {
uuid = generateUUID();
saveUUIDToEEPROM(uuid);
Serial.println("Generated new device UUID");
} else {
Serial.println("Loaded existing UUID from EEPROM");
}
return uuid;
}
String generateUUID() {
uint64_t chipId = ESP.getEfuseMac();
String uuid = String((uint32_t)(chipId >> 32), HEX) +
String((uint32_t)chipId, HEX) +
String(random(0x10000000, 0xFFFFFFFF), HEX);
uuid.toUpperCase();
return uuid;
}
void saveUUIDToEEPROM(const String& uuid) {
for (int i = 0; i < uuid.length() && i < 32; i++) {
EEPROM.write(i, uuid[i]);
}
EEPROM.write(uuid.length(), '\0');
EEPROM.commit();
}
String loadUUIDFromEEPROM() {
String uuid = "";
char ch;
for (int i = 0; i < 32; i++) {
ch = EEPROM.read(i);
if (ch == '\0') break;
uuid += ch;
}
return uuid;
}
};
class AirQualitySensor {
private:
int pin;
bool isWarmedUp;
public:
AirQualitySensor(int sensorPin) : pin(sensorPin), isWarmedUp(true) {}
float readAirQualityPPM() {
const int numReadings = 10;
float totalResistance = 0.0;
// Take multiple readings for averaging
for (int i = 0; i < numReadings; i++) {
int analogValue = analogRead(pin);
float voltage = (analogValue / 4095.0) * 3.3;
float resistance = calculateResistance(voltage);
if (resistance > 0) {
totalResistance += resistance;
}
delay(100);
}
float avgResistance = totalResistance / numReadings;
float ratio = avgResistance / RZERO;
float ppm = PARA * pow(ratio, -PARB);
// Sanity check
if (ppm < 0 || ppm > 1000) {
Serial.println("Warning: Sensor reading out of expected range");
return -1.0;
}
return ppm;
}
private:
float calculateResistance(float voltage) {
if (voltage <= 0.0 || voltage >= 3.3) {
return 0.0;
}
return ((3.3 - voltage) / voltage) * RLOAD;
}
};
class DataTransmitter {
private:
String serverURL;
public:
DataTransmitter(const String& url) : serverURL(url) {}
bool transmitData(const String& deviceId, float ppm, float lat, float lng) {
if (ppm < 0) {
Serial.println("Invalid sensor reading - skipping transmission");
return false;
}
HTTPClient http;
http.begin(serverURL);
http.addHeader("Content-Type", "application/json");
http.addHeader("User-Agent", "ESP32-AirQuality/1.0");
http.setTimeout(20000);
DynamicJsonDocument doc(1024);
doc["device_id"] = deviceId;
doc["timestamp"] = getCurrentTimestamp();
doc["air_quality_ppm"] = ppm;
doc["latitude"] = lat;
doc["longitude"] = lng;
doc["sensor_type"] = "MQ135";
doc["firmware_version"] = "1.0.0";
String jsonString;
serializeJson(doc, jsonString);
Serial.println("Transmitting data to server:");
Serial.println(jsonString);
int httpResponseCode = http.POST(jsonString);
if (httpResponseCode >= 200 && httpResponseCode < 300) {
String response = http.getString();
Serial.print("Transmission successful. Response: ");
Serial.println(response);
http.end();
return true;
} else {
Serial.print("Transmission failed. HTTP Code: ");
Serial.println(httpResponseCode);
if (httpResponseCode > 0) {
Serial.print("Error response: ");
Serial.println(http.getString());
}
http.end();
return false;
}
}
private:
String getCurrentTimestamp() {
// In production, sync with NTP server
return String(millis());
}
};
// Global objects
SystemManager systemManager;
AirQualitySensor airSensor(SENSOR_PIN);
DataTransmitter dataTransmitter(SERVER_URL);
void setup() {
// Initialize random seed
randomSeed(analogRead(0));
// Initialize system
systemManager.initialize();
// Record first measurement time
lastMeasurementTime = millis();
Serial.println("=== System Ready ===");
}
void loop() {
unsigned long currentTime = millis();
// Check if it's time for next measurement
if (currentTime - lastMeasurementTime >= MEASUREMENT_INTERVAL) {
if (!systemManager.isSystemReady()) {
Serial.println("System not ready, skipping measurement");
delay(5000);
return;
}
// Ensure WiFi connection
if (!systemManager.ensureWiFiConnection()) {
Serial.println("WiFi connection failed, entering deep sleep");
systemManager.enterDeepSleep();
return;
}
// Indicate measurement in progress
digitalWrite(LED_PIN, LOW);
Serial.println("\n--- Taking Air Quality Measurement ---");
// Read sensor data
float ppmValue = airSensor.readAirQualityPPM();
if (ppmValue >= 0) {
Serial.print("Air Quality Reading: ");
Serial.print(ppmValue, 2);
Serial.println(" PPM");
// Transmit data
bool success = dataTransmitter.transmitData(
deviceUUID, ppmValue, DEVICE_LATITUDE, DEVICE_LONGITUDE);
if (success) {
Serial.println("Data transmission successful");
// Quick blink to indicate success
for (int i = 0; i < 3; i++) {
digitalWrite(LED_PIN, HIGH);
delay(100);
digitalWrite(LED_PIN, LOW);
delay(100);
}
} else {
Serial.println("Data transmission failed");
// Longer blink to indicate failure
for (int i = 0; i < 5; i++) {
digitalWrite(LED_PIN, HIGH);
delay(200);
digitalWrite(LED_PIN, LOW);
delay(200);
}
}
} else {
Serial.println("Invalid sensor reading - measurement skipped");
}
// Return LED to ready state
digitalWrite(LED_PIN, HIGH);
// Update last measurement time
lastMeasurementTime = currentTime;
Serial.print("Next measurement in ");
Serial.print(MEASUREMENT_INTERVAL / 60000);
Serial.println(" minutes");
}
// Small delay to prevent excessive CPU usage
delay(1000);
// Watchdog-style check - restart if system becomes unresponsive
if (millis() - lastMeasurementTime > MEASUREMENT_INTERVAL * 2) {
Serial.println("System appears unresponsive - restarting");
ESP.restart();
}
}
Complete Server Package Configuration
Json
// package.json - Node.js dependencies
{
"name": "air-quality-monitoring-server",
"version": "1.0.0",
"description": "IoT Air Quality Monitoring System Server",
"main": "server.js",
"scripts": {
"start": "node server.js",
"dev": "nodemon server.js",
"test": "jest",
"lint": "eslint .",
"setup": "node scripts/setup.js"
},
"dependencies": {
"express": "^4.18.2",
"mongoose": "^7.5.0",
"cors": "^2.8.5",
"helmet": "^7.0.0",
"express-rate-limit": "^6.8.1",
"compression": "^1.7.4",
"express-validator": "^7.0.1",
"winston": "^3.10.0",
"@tensorflow/tfjs-node": "^4.10.0",
"dotenv": "^16.3.1"
},
"devDependencies": {
"nodemon": "^3.0.1",
"jest": "^29.6.2",
"eslint": "^8.47.0"
},
"engines": {
"node": ">=16.0.0"
}
}
Bash
#!/bin/bash
# deploy.sh - Deployment script
echo "=== Air Quality Monitoring System Deployment ==="
# Create necessary directories
mkdir -p logs
mkdir -p data
mkdir -p config
# Install dependencies
echo "Installing Node.js dependencies..."
npm install
# Set up environment variables
if [ ! -f .env ]; then
echo "Creating environment configuration..."
cat > .env << EOF
NODE_ENV=production
PORT=3000
MONGODB_URI=mongodb://localhost:27017/air_quality_monitoring
LOG_LEVEL=info
ALLOWED_ORIGINS=http://localhost:3000,https://yourdomain.com
EOF
fi
# Create systemd service file
echo "Creating systemd service..."
sudo tee /etc/systemd/system/air-quality-server.service > /dev/null << EOF
[Unit]
Description=Air Quality Monitoring Server
After=network.target
[Service]
Type=simple
User=ubuntu
WorkingDirectory=/path/to/your/project
ExecStart=/usr/bin/node server.js
Restart=on-failure
RestartSec=10
Environment=NODE_ENV=production
[Install]
WantedBy=multi-user.target
EOF
# Enable and start service
sudo systemctl daemon-reload
sudo systemctl enable air-quality-server
sudo systemctl start air-quality-server
echo "Deployment complete!"
echo "Server status: $(sudo systemctl is-active air-quality-server)"
echo "View logs: sudo journalctl -u air-quality-server -f"
This comprehensive IoT air pollution monitoring system provides a complete solution for distributed environmental monitoring. The ESP32-based sensor nodes continuously collect air quality data and transmit it to a central server that processes the information using artificial intelligence algorithms. The system offers scalable architecture, real-time data analysis, and intelligent insights for environmental monitoring applications. Users can access aggregated data for any geographical area with configurable radius parameters, enabling effective air quality management and public health protection.
No comments:
Post a Comment