Tuesday, December 30, 2025

Building an AI Distributed IoT Air Pollution Monitoring Network



Introduction and System Overview


Air pollution monitoring has become increasingly critical in urban environments where industrial activities, vehicle emissions, and other sources contribute to deteriorating air quality. Traditional monitoring systems often rely on expensive, centralized equipment that provides limited spatial coverage. This article presents a comprehensive solution for building a distributed Internet of Things (IoT) network that enables cost-effective, wide-area air pollution monitoring using multiple remote sensor devices.

The proposed system architecture consists of three primary components working in harmony. First, numerous ESP32-based sensor nodes equipped with air quality sensors continuously collect environmental data from their respective locations. Second, a central web server receives, processes, and stores data from all registered devices while maintaining their geographical information. Third, an intelligent data processing system that utilizes artificial intelligence algorithms to analyze collected data and provide meaningful insights to end users.

The fundamental operation principle involves each sensor device measuring air pollution parameters at regular intervals and transmitting this data along with its unique identifier to the central server. The server then processes incoming data by grouping measurements from devices within configurable geographical regions and computing average pollution levels for those areas. Users can query the system to obtain real-time air quality information for any specified location and radius.

This distributed approach offers several advantages over traditional monitoring systems. The network provides higher spatial resolution by deploying multiple low-cost sensors across a wide area. The system demonstrates excellent scalability, allowing new devices to be added without significant infrastructure changes. Additionally, the redundancy inherent in multiple sensors improves data reliability and system robustness.


Hardware Architecture and Component Selection


ESP32 Microcontroller Platform


The ESP32 serves as the foundation for each sensor node due to its integrated WiFi capabilities, sufficient processing power, and extensive peripheral support. The ESP32 features a dual-core Tensilica Xtensa LX6 microprocessor running at up to 240 MHz, providing adequate computational resources for sensor data processing and network communication. The integrated 802.11 b/g/n WiFi transceiver eliminates the need for additional networking hardware, reducing both cost and complexity.

The ESP32 development board includes essential components such as voltage regulators, crystal oscillators, and USB-to-serial converters that simplify the development process. The board provides numerous GPIO pins for sensor interfacing, including analog-to-digital converters, I2C, SPI, and UART communication interfaces. The built-in flash memory stores the firmware program, while the RAM provides sufficient space for data buffering and network operations.


Air Quality Sensor Selection


For air pollution measurement, the MQ-135 gas sensor provides a cost-effective solution for detecting various air pollutants including ammonia, nitrogen oxides, benzene, smoke, and carbon dioxide. This semiconductor-based sensor operates by measuring changes in electrical conductivity when exposed to target gases. The sensor output varies proportionally to the concentration of detected pollutants, providing an analog voltage signal that the ESP32 can easily process.

The MQ-135 sensor requires a heating element to maintain optimal operating temperature, which consumes approximately 150 milliwatts of power. The sensor includes both analog and digital outputs, though the analog output provides more precise measurements suitable for quantitative analysis. The response time typically ranges from 10 to 60 seconds, making it suitable for continuous monitoring applications.


Circuit Design and Connections


The hardware circuit connects the MQ-135 sensor to the ESP32 through a simple interface that requires minimal external components. The sensor's VCC pin connects to the ESP32's 3.3V power supply, while the ground pin connects to the common ground. The analog output pin connects to one of the ESP32's ADC-capable GPIO pins, specifically GPIO34 in this implementation.

A pull-up resistor of 10 kilohms connects between the sensor's analog output and the power supply to ensure stable readings. Additionally, a bypass capacitor of 100 microfarads connects between power and ground near the sensor to filter power supply noise. These components ensure reliable sensor operation and accurate measurements.

The complete circuit requires minimal external components, making it suitable for compact, low-cost deployment. The ESP32's built-in voltage regulation eliminates the need for additional power conditioning circuits when powered from a 5V USB source or appropriate wall adapter.


ESP32 Air Quality Sensor Circuit Diagram:


                    +3.3V

                      |

                     +-+

                     | | 10kΩ

                     | |

                     +-+

                      |

    ESP32             |              MQ-135

   +-------+          |             +-------+

   |       |          +-------------|VCC    |

   |GPIO34 |<---------------------- |A0     |

   |       |                        |       |

   |GND    |------------------------|GND    |

   |       |                        |       |

   |3.3V   |------------------------|VCC    |

   +-------+                        +-------+

                                         |

                                       -----

                                       ----- 100µF

                                         |

                                        GND


ESP32 Firmware Implementation


Core System Architecture


The ESP32 firmware follows a modular architecture that separates concerns into distinct functional units. The main program loop coordinates between sensor reading, data processing, network communication, and power management functions. This separation ensures maintainable code and facilitates future enhancements or modifications.

The firmware implements a state machine approach where the device cycles through different operational states including initialization, sensor reading, data transmission, and sleep modes. This approach optimizes power consumption while ensuring reliable data collection and transmission.


WiFi Network Management


The WiFi connection management system handles network connectivity with automatic reconnection capabilities. The system stores network credentials in non-volatile memory and attempts to connect to the configured network during startup. If the connection fails, the system implements an exponential backoff strategy to avoid overwhelming the network infrastructure.


#include <WiFi.h>

#include <HTTPClient.h>

#include <ArduinoJson.h>

#include <EEPROM.h>

#include <esp_system.h>


// Network configuration constants

const char* WIFI_SSID = "YourNetworkName";

const char* WIFI_PASSWORD = "YourNetworkPassword";

const char* SERVER_URL = "http://your-server.com/api/sensor-data";


// Device configuration

const int SENSOR_PIN = 34;

const int MEASUREMENT_INTERVAL = 600000; // 10 minutes in milliseconds

const int MAX_RETRY_ATTEMPTS = 3;

const int WIFI_TIMEOUT = 10000; // 10 seconds


// Global variables for device operation

String deviceUUID;

unsigned long lastMeasurementTime = 0;

bool wifiConnected = false;


class WiFiManager {

private:

    int connectionAttempts;

    unsigned long lastConnectionAttempt;

    

public:

    WiFiManager() : connectionAttempts(0), lastConnectionAttempt(0) {}

    

    bool initializeConnection() {

        WiFi.mode(WIFI_STA);

        WiFi.begin(WIFI_SSID, WIFI_PASSWORD);

        

        Serial.println("Connecting to WiFi network...");

        

        unsigned long startTime = millis();

        while (WiFi.status() != WL_CONNECTED && 

               (millis() - startTime) < WIFI_TIMEOUT) {

            delay(500);

            Serial.print(".");

        }

        

        if (WiFi.status() == WL_CONNECTED) {

            Serial.println("\nWiFi connected successfully");

            Serial.print("IP address: ");

            Serial.println(WiFi.localIP());

            connectionAttempts = 0;

            return true;

        } else {

            Serial.println("\nWiFi connection failed");

            connectionAttempts++;

            return false;

        }

    }

    

    bool isConnected() {

        return WiFi.status() == WL_CONNECTED;

    }

    

    bool reconnectIfNeeded() {

        if (!isConnected()) {

            Serial.println("WiFi disconnected, attempting reconnection...");

            return initializeConnection();

        }

        return true;

    }

    

    void handleConnectionFailure() {

        if (connectionAttempts >= MAX_RETRY_ATTEMPTS) {

            Serial.println("Maximum connection attempts reached, entering deep sleep");

            esp_deep_sleep(MEASUREMENT_INTERVAL * 1000);

        }

    }

};


The WiFiManager class encapsulates all network-related functionality, providing clean interfaces for connection management. The class implements retry logic with configurable timeouts to handle temporary network issues gracefully. When maximum retry attempts are reached, the device enters deep sleep mode to conserve power and avoid continuous failed connection attempts.


Sensor Data Acquisition System


The sensor data acquisition system implements calibrated readings from the MQ-135 air quality sensor. The system performs multiple readings and applies statistical filtering to reduce noise and improve measurement accuracy. The implementation includes temperature compensation and baseline correction to account for environmental variations.


class AirQualitySensor {

private:

    int sensorPin;

    float baselineResistance;

    int warmupTime;

    bool isWarmedUp;

    

    // Calibration constants for MQ-135 sensor

    static constexpr float RLOAD = 10.0; // Load resistance in kΩ

    static constexpr float RZERO = 76.63; // Sensor resistance in clean air

    static constexpr float PARA = 116.6020682; // Calibration parameter

    static constexpr float PARB = 2.769034857; // Calibration parameter

    

public:

    AirQualitySensor(int pin) : sensorPin(pin), isWarmedUp(false), warmupTime(20000) {

        pinMode(sensorPin, INPUT);

        baselineResistance = 0.0;

    }

    

    void initialize() {

        Serial.println("Initializing air quality sensor...");

        

        // Allow sensor to warm up

        unsigned long startTime = millis();

        while ((millis() - startTime) < warmupTime) {

            delay(1000);

            Serial.print("Warming up sensor... ");

            Serial.print((millis() - startTime) / 1000);

            Serial.println(" seconds");

        }

        

        // Calculate baseline resistance in clean air

        baselineResistance = calculateResistance();

        isWarmedUp = true;

        

        Serial.print("Sensor initialization complete. Baseline resistance: ");

        Serial.print(baselineResistance);

        Serial.println(" kΩ");

    }

    

    float readRawValue() {

        if (!isWarmedUp) {

            Serial.println("Warning: Sensor not properly warmed up");

            return -1.0;

        }

        

        // Take multiple readings for averaging

        const int numReadings = 10;

        float totalValue = 0.0;

        

        for (int i = 0; i < numReadings; i++) {

            int analogValue = analogRead(sensorPin);

            float voltage = (analogValue / 4095.0) * 3.3; // Convert to voltage

            float resistance = calculateResistanceFromVoltage(voltage);

            totalValue += resistance;

            delay(100); // Small delay between readings

        }

        

        return totalValue / numReadings;

    }

    

    float calculatePPM() {

        float resistance = readRawValue();

        if (resistance < 0) {

            return -1.0; // Error condition

        }

        

        // Calculate ratio of current resistance to baseline

        float ratio = resistance / RZERO;

        

        // Convert resistance ratio to PPM using calibration curve

        float ppm = PARA * pow(ratio, -PARB);

        

        return ppm;

    }

    

private:

    float calculateResistance() {

        int analogValue = analogRead(sensorPin);

        float voltage = (analogValue / 4095.0) * 3.3;

        return calculateResistanceFromVoltage(voltage);

    }

    

    float calculateResistanceFromVoltage(float voltage) {

        if (voltage <= 0.0) {

            return 0.0;

        }

        

        // Calculate sensor resistance using voltage divider formula

        float resistance = ((3.3 - voltage) / voltage) * RLOAD;

        return resistance;

    }

};


The AirQualitySensor class implements comprehensive sensor management including initialization, calibration, and measurement functions. The class performs statistical averaging across multiple readings to reduce measurement noise and improve data quality. The implementation includes proper sensor warm-up procedures and resistance-to-PPM conversion using calibrated parameters specific to the MQ-135 sensor.


Data Transmission and Communication Protocol


The data transmission system implements a robust HTTP-based communication protocol for sending sensor measurements to the central server. The system formats data in JSON format and includes error handling for network failures and server responses.


class DataTransmissionManager {

private:

    String serverURL;

    String deviceID;

    HTTPClient httpClient;

    

public:

    DataTransmissionManager(const String& url, const String& id) 

        : serverURL(url), deviceID(id) {}

    

    bool transmitSensorData(float ppmValue, float latitude, float longitude) {

        if (ppmValue < 0) {

            Serial.println("Invalid sensor reading, skipping transmission");

            return false;

        }

        

        // Create JSON payload

        DynamicJsonDocument jsonDoc(1024);

        jsonDoc["device_id"] = deviceID;

        jsonDoc["timestamp"] = getCurrentTimestamp();

        jsonDoc["air_quality_ppm"] = ppmValue;

        jsonDoc["latitude"] = latitude;

        jsonDoc["longitude"] = longitude;

        jsonDoc["sensor_type"] = "MQ135";

        jsonDoc["firmware_version"] = "1.0.0";

        

        String jsonString;

        serializeJson(jsonDoc, jsonString);

        

        Serial.println("Transmitting sensor data:");

        Serial.println(jsonString);

        

        // Configure HTTP client

        httpClient.begin(serverURL);

        httpClient.addHeader("Content-Type", "application/json");

        httpClient.addHeader("User-Agent", "ESP32-AirQuality-Sensor/1.0");

        httpClient.setTimeout(15000); // 15 second timeout

        

        // Send POST request

        int httpResponseCode = httpClient.POST(jsonString);

        

        if (httpResponseCode > 0) {

            String response = httpClient.getString();

            Serial.print("HTTP Response Code: ");

            Serial.println(httpResponseCode);

            Serial.print("Server Response: ");

            Serial.println(response);

            

            httpClient.end();

            return (httpResponseCode >= 200 && httpResponseCode < 300);

        } else {

            Serial.print("HTTP Error: ");

            Serial.println(httpClient.errorToString(httpResponseCode));

            httpClient.end();

            return false;

        }

    }

    

private:

    unsigned long getCurrentTimestamp() {

        // In a production system, this would sync with NTP server

        // For simplicity, using millis() as relative timestamp

        return millis();

    }

};


Device Registration and UUID Management


The device registration system ensures each sensor node has a unique identifier and properly registers with the central server during initial setup. The system stores the UUID in non-volatile memory to maintain consistency across power cycles.


class DeviceManager {

private:

    String uuid;

    bool isRegistered;

    

public:

    DeviceManager() : isRegistered(false) {}

    

    void initialize() {

        // Try to load existing UUID from EEPROM

        EEPROM.begin(512);

        uuid = loadUUIDFromEEPROM();

        

        if (uuid.length() == 0) {

            // Generate new UUID if none exists

            uuid = generateUUID();

            saveUUIDToEEPROM(uuid);

            Serial.println("Generated new device UUID: " + uuid);

        } else {

            Serial.println("Loaded existing UUID: " + uuid);

        }

    }

    

    String getDeviceUUID() {

        return uuid;

    }

    

    bool registerWithServer(float latitude, float longitude) {

        HTTPClient httpClient;

        String registrationURL = "http://your-server.com/api/register-device";

        

        // Create registration payload

        DynamicJsonDocument jsonDoc(512);

        jsonDoc["device_id"] = uuid;

        jsonDoc["device_type"] = "air_quality_sensor";

        jsonDoc["latitude"] = latitude;

        jsonDoc["longitude"] = longitude;

        jsonDoc["sensor_model"] = "MQ135";

        jsonDoc["firmware_version"] = "1.0.0";

        

        String jsonString;

        serializeJson(jsonDoc, jsonString);

        

        httpClient.begin(registrationURL);

        httpClient.addHeader("Content-Type", "application/json");

        

        int responseCode = httpClient.POST(jsonString);

        

        if (responseCode >= 200 && responseCode < 300) {

            Serial.println("Device registered successfully with server");

            isRegistered = true;

            httpClient.end();

            return true;

        } else {

            Serial.print("Device registration failed. Response code: ");

            Serial.println(responseCode);

            httpClient.end();

            return false;

        }

    }

    

private:

    String generateUUID() {

        // Simple UUID generation based on ESP32 chip ID and random values

        uint64_t chipId = ESP.getEfuseMac();

        String uuid = String((uint32_t)(chipId >> 32), HEX) + 

                     String((uint32_t)chipId, HEX) + 

                     String(random(0x10000000, 0xFFFFFFFF), HEX);

        uuid.toUpperCase();

        return uuid;

    }

    

    void saveUUIDToEEPROM(const String& uuid) {

        for (int i = 0; i < uuid.length() && i < 32; i++) {

            EEPROM.write(i, uuid[i]);

        }

        EEPROM.write(uuid.length(), '\0');

        EEPROM.commit();

    }

    

    String loadUUIDFromEEPROM() {

        String uuid = "";

        char ch;

        for (int i = 0; i < 32; i++) {

            ch = EEPROM.read(i);

            if (ch == '\0') break;

            uuid += ch;

        }

        return uuid;

    }

};


Main Program Structure and Execution Flow


The main program coordinates all system components and implements the primary execution loop. The program handles initialization, periodic sensor readings, data transmission, and power management.


// Global object instances

WiFiManager wifiManager;

AirQualitySensor airSensor(SENSOR_PIN);

DataTransmissionManager dataManager(SERVER_URL, "");

DeviceManager deviceManager;


void setup() {

    Serial.begin(115200);

    Serial.println("ESP32 Air Quality Sensor Starting...");

    

    // Initialize random number generator

    randomSeed(analogRead(0));

    

    // Initialize device manager and get UUID

    deviceManager.initialize();

    String deviceUUID = deviceManager.getDeviceUUID();

    

    // Update data manager with device UUID

    dataManager = DataTransmissionManager(SERVER_URL, deviceUUID);

    

    // Initialize WiFi connection

    if (!wifiManager.initializeConnection()) {

        Serial.println("Failed to connect to WiFi, entering deep sleep");

        esp_deep_sleep(MEASUREMENT_INTERVAL * 1000);

    }

    

    // Register device with server (with hardcoded coordinates for this example)

    // In production, GPS module would provide actual coordinates

    float deviceLatitude = 40.7128;  // Example: New York City

    float deviceLongitude = -74.0060;

    

    if (!deviceManager.registerWithServer(deviceLatitude, deviceLongitude)) {

        Serial.println("Device registration failed, continuing with measurements");

    }

    

    // Initialize air quality sensor

    airSensor.initialize();

    

    Serial.println("System initialization complete");

    lastMeasurementTime = millis();

}


void loop() {

    unsigned long currentTime = millis();

    

    // Check if it's time for next measurement

    if (currentTime - lastMeasurementTime >= MEASUREMENT_INTERVAL) {

        

        // Ensure WiFi connection is active

        if (!wifiManager.reconnectIfNeeded()) {

            wifiManager.handleConnectionFailure();

            return;

        }

        

        // Read sensor data

        Serial.println("Taking air quality measurement...");

        float ppmValue = airSensor.calculatePPM();

        

        if (ppmValue >= 0) {

            Serial.print("Air quality reading: ");

            Serial.print(ppmValue);

            Serial.println(" PPM");

            

            // Transmit data to server

            float deviceLatitude = 40.7128;  // In production, get from GPS

            float deviceLongitude = -74.0060;

            

            bool transmissionSuccess = dataManager.transmitSensorData(

                ppmValue, deviceLatitude, deviceLongitude);

            

            if (transmissionSuccess) {

                Serial.println("Data transmitted successfully");

            } else {

                Serial.println("Data transmission failed");

            }

        } else {

            Serial.println("Invalid sensor reading, skipping transmission");

        }

        

        lastMeasurementTime = currentTime;

    }

    

    // Small delay to prevent excessive CPU usage

    delay(1000);

}


Central Web Server Implementation


Server Architecture and Technology Stack


The central web server implements a scalable architecture capable of handling multiple concurrent sensor devices while providing real-time data access to end users. The server utilizes Node.js with Express framework for HTTP request handling, MongoDB for data persistence, and implements RESTful API endpoints for device communication and user queries.

The server architecture follows a modular design pattern with separate modules for device management, data processing, geographical calculations, and artificial intelligence-based data analysis. This separation ensures maintainable code and facilitates future enhancements or scaling requirements.


Database Schema and Data Models


The database schema accommodates device registration information, sensor measurements, and user access patterns. The design optimizes for both write-heavy operations from sensor devices and read-heavy operations from user queries.

Javascript


// server.js - Main server application

const express = require('express');

const mongoose = require('mongoose');

const cors = require('cors');

const helmet = require('helmet');

const rateLimit = require('express-rate-limit');

const compression = require('compression');


// Import custom modules

const deviceRoutes = require('./routes/devices');

const dataRoutes = require('./routes/data');

const analyticsRoutes = require('./routes/analytics');

const { connectDatabase } = require('./config/database');

const { logger } = require('./utils/logger');


// Express application setup

const app = express();

const PORT = process.env.PORT || 3000;


// Security middleware

app.use(helmet());

app.use(cors({

    origin: process.env.ALLOWED_ORIGINS?.split(',') || ['http://localhost:3000'],

    credentials: true

}));


// Rate limiting configuration

const limiter = rateLimit({

    windowMs: 15 * 60 * 1000, // 15 minutes

    max: 100, // Limit each IP to 100 requests per windowMs

    message: 'Too many requests from this IP, please try again later.'

});

app.use('/api/', limiter);


// Compression and parsing middleware

app.use(compression());

app.use(express.json({ limit: '10mb' }));

app.use(express.urlencoded({ extended: true }));


// Request logging middleware

app.use((req, res, next) => {

    logger.info(`${req.method} ${req.path} - ${req.ip}`);

    next();

});


// API routes

app.use('/api/devices', deviceRoutes);

app.use('/api/data', dataRoutes);

app.use('/api/analytics', analyticsRoutes);


// Health check endpoint

app.get('/health', (req, res) => {

    res.status(200).json({

        status: 'healthy',

        timestamp: new Date().toISOString(),

        uptime: process.uptime()

    });

});


// Error handling middleware

app.use((error, req, res, next) => {

    logger.error('Unhandled error:', error);

    res.status(500).json({

        error: 'Internal server error',

        message: process.env.NODE_ENV === 'development' ? error.message : 'Something went wrong'

    });

});


// 404 handler

app.use('*', (req, res) => {

    res.status(404).json({

        error: 'Not found',

        message: 'The requested resource was not found'

    });

});


// Database connection and server startup

async function startServer() {

    try {

        await connectDatabase();

        logger.info('Database connected successfully');

        

        app.listen(PORT, () => {

            logger.info(`Air Quality Monitoring Server running on port ${PORT}`);

            logger.info(`Environment: ${process.env.NODE_ENV || 'development'}`);

        });

    } catch (error) {

        logger.error('Failed to start server:', error);

        process.exit(1);

    }

}


// Graceful shutdown handling

process.on('SIGTERM', () => {

    logger.info('SIGTERM received, shutting down gracefully');

    process.exit(0);

});


process.on('SIGINT', () => {

    logger.info('SIGINT received, shutting down gracefully');

    process.exit(0);

});


startServer();


Database Models and Schema Definitions


The database models define the structure for storing device information, sensor measurements, and analytical results. The schema includes proper indexing for efficient geographical queries and time-based data retrieval.


Javascript


// models/Device.js - Device registration model

const mongoose = require('mongoose');


const deviceSchema = new mongoose.Schema({

    deviceId: {

        type: String,

        required: true,

        unique: true,

        index: true,

        trim: true,

        maxlength: 64

    },

    deviceType: {

        type: String,

        required: true,

        enum: ['air_quality_sensor', 'weather_station', 'noise_monitor'],

        default: 'air_quality_sensor'

    },

    location: {

        type: {

            type: String,

            enum: ['Point'],

            required: true,

            default: 'Point'

        },

        coordinates: {

            type: [Number],

            required: true,

            validate: {

                validator: function(coords) {

                    return coords.length === 2 && 

                           coords[0] >= -180 && coords[0] <= 180 && // longitude

                           coords[1] >= -90 && coords[1] <= 90;    // latitude

                },

                message: 'Invalid coordinates format'

            }

        }

    },

    sensorModel: {

        type: String,

        required: true,

        trim: true,

        maxlength: 50

    },

    firmwareVersion: {

        type: String,

        required: true,

        trim: true,

        maxlength: 20

    },

    registrationDate: {

        type: Date,

        default: Date.now,

        index: true

    },

    lastActivity: {

        type: Date,

        default: Date.now,

        index: true

    },

    isActive: {

        type: Boolean,

        default: true,

        index: true

    },

    calibrationData: {

        baselineResistance: Number,

        calibrationDate: Date,

        calibrationCoefficients: [Number]

    }

}, {

    timestamps: true

});


// Create geospatial index for location-based queries

deviceSchema.index({ location: '2dsphere' });


// Index for efficient device lookup

deviceSchema.index({ deviceId: 1, isActive: 1 });


// Update last activity timestamp on save

deviceSchema.pre('save', function(next) {

    this.lastActivity = new Date();

    next();

});


// Instance methods

deviceSchema.methods.updateActivity = function() {

    this.lastActivity = new Date();

    return this.save();

};


deviceSchema.methods.deactivate = function() {

    this.isActive = false;

    return this.save();

};


// Static methods

deviceSchema.statics.findActiveDevices = function() {

    return this.find({ isActive: true });

};


deviceSchema.statics.findDevicesInRadius = function(longitude, latitude, radiusMeters) {

    return this.find({

        location: {

            $geoWithin: {

                $centerSphere: [[longitude, latitude], radiusMeters / 6378100]

            }

        },

        isActive: true

    });

};


module.exports = mongoose.model('Device', deviceSchema);


Javascript


// models/SensorData.js - Sensor measurement model

const mongoose = require('mongoose');


const sensorDataSchema = new mongoose.Schema({

    deviceId: {

        type: String,

        required: true,

        index: true,

        ref: 'Device'

    },

    timestamp: {

        type: Date,

        required: true,

        index: true,

        default: Date.now

    },

    location: {

        type: {

            type: String,

            enum: ['Point'],

            required: true,

            default: 'Point'

        },

        coordinates: {

            type: [Number],

            required: true

        }

    },

    measurements: {

        airQualityPPM: {

            type: Number,

            required: true,

            min: 0,

            max: 10000,

            validate: {

                validator: function(value) {

                    return !isNaN(value) && isFinite(value);

                },

                message: 'Air quality PPM must be a valid number'

            }

        },

        temperature: {

            type: Number,

            min: -50,

            max: 100

        },

        humidity: {

            type: Number,

            min: 0,

            max: 100

        },

        pressure: {

            type: Number,

            min: 800,

            max: 1200

        }

    },

    sensorType: {

        type: String,

        required: true,

        enum: ['MQ135', 'MQ7', 'BME280', 'DHT22'],

        default: 'MQ135'

    },

    firmwareVersion: {

        type: String,

        required: true

    },

    dataQuality: {

        type: String,

        enum: ['excellent', 'good', 'fair', 'poor'],

        default: 'good'

    },

    processed: {

        type: Boolean,

        default: false,

        index: true

    }

}, {

    timestamps: true

});


// Compound indexes for efficient queries

sensorDataSchema.index({ deviceId: 1, timestamp: -1 });

sensorDataSchema.index({ timestamp: -1, processed: 1 });

sensorDataSchema.index({ location: '2dsphere', timestamp: -1 });


// Time-based partitioning index

sensorDataSchema.index({ 

    timestamp: -1 

}, { 

    expireAfterSeconds: 60 * 60 * 24 * 365 // 1 year retention

});


// Static methods for data analysis

sensorDataSchema.statics.getAverageInRadius = async function(longitude, latitude, radiusMeters, startTime, endTime) {

    const pipeline = [

        {

            $match: {

                location: {

                    $geoWithin: {

                        $centerSphere: [[longitude, latitude], radiusMeters / 6378100]

                    }

                },

                timestamp: {

                    $gte: startTime,

                    $lte: endTime

                }

            }

        },

        {

            $group: {

                _id: null,

                averageAirQuality: { $avg: '$measurements.airQualityPPM' },

                averageTemperature: { $avg: '$measurements.temperature' },

                averageHumidity: { $avg: '$measurements.humidity' },

                sampleCount: { $sum: 1 },

                deviceCount: { $addToSet: '$deviceId' }

            }

        },

        {

            $project: {

                _id: 0,

                averageAirQuality: { $round: ['$averageAirQuality', 2] },

                averageTemperature: { $round: ['$averageTemperature', 2] },

                averageHumidity: { $round: ['$averageHumidity', 2] },

                sampleCount: 1,

                deviceCount: { $size: '$deviceCount' }

            }

        }

    ];

    

    const result = await this.aggregate(pipeline);

    return result[0] || null;

};


sensorDataSchema.statics.getTimeSeriesData = function(deviceId, startTime, endTime, interval = '1h') {

    const groupBy = {

        '1m': { $dateToString: { format: '%Y-%m-%d %H:%M', date: '$timestamp' } },

        '1h': { $dateToString: { format: '%Y-%m-%d %H:00', date: '$timestamp' } },

        '1d': { $dateToString: { format: '%Y-%m-%d', date: '$timestamp' } }

    };

    

    return this.aggregate([

        {

            $match: {

                deviceId: deviceId,

                timestamp: { $gte: startTime, $lte: endTime }

            }

        },

        {

            $group: {

                _id: groupBy[interval] || groupBy['1h'],

                averageAirQuality: { $avg: '$measurements.airQualityPPM' },

                minAirQuality: { $min: '$measurements.airQualityPPM' },

                maxAirQuality: { $max: '$measurements.airQualityPPM' },

                sampleCount: { $sum: 1 }

            }

        },

        {

            $sort: { _id: 1 }

        }

    ]);

};


module.exports = mongoose.model('SensorData', sensorDataSchema);


Device Registration and Management API


The device registration system handles new device onboarding, location updates, and device status management. The API validates device information and maintains an active device registry.


Javascript


// routes/devices.js - Device management routes

const express = require('express');

const Device = require('../models/Device');

const { logger } = require('../utils/logger');

const { validateDeviceRegistration } = require('../middleware/validation');


const router = express.Router();


// Register new device

router.post('/register', validateDeviceRegistration, async (req, res) => {

    try {

        const {

            device_id,

            device_type,

            latitude,

            longitude,

            sensor_model,

            firmware_version

        } = req.body;


        // Check if device already exists

        const existingDevice = await Device.findOne({ deviceId: device_id });

        if (existingDevice) {

            // Update existing device information

            existingDevice.location.coordinates = [longitude, latitude];

            existingDevice.sensorModel = sensor_model;

            existingDevice.firmwareVersion = firmware_version;

            existingDevice.isActive = true;

            

            await existingDevice.save();

            

            logger.info(`Device ${device_id} updated registration`);

            return res.status(200).json({

                success: true,

                message: 'Device registration updated successfully',

                deviceId: device_id

            });

        }


        // Create new device registration

        const newDevice = new Device({

            deviceId: device_id,

            deviceType: device_type || 'air_quality_sensor',

            location: {

                type: 'Point',

                coordinates: [longitude, latitude]

            },

            sensorModel: sensor_model,

            firmwareVersion: firmware_version,

            registrationDate: new Date(),

            isActive: true

        });


        await newDevice.save();

        

        logger.info(`New device registered: ${device_id}`);

        res.status(201).json({

            success: true,

            message: 'Device registered successfully',

            deviceId: device_id

        });


    } catch (error) {

        logger.error('Device registration error:', error);

        res.status(500).json({

            success: false,

            error: 'Device registration failed',

            message: error.message

        });

    }

});


// Get device information

router.get('/:deviceId', async (req, res) => {

    try {

        const { deviceId } = req.params;

        

        const device = await Device.findOne({ deviceId: deviceId });

        if (!device) {

            return res.status(404).json({

                success: false,

                error: 'Device not found'

            });

        }


        res.json({

            success: true,

            device: {

                deviceId: device.deviceId,

                deviceType: device.deviceType,

                location: device.location,

                sensorModel: device.sensorModel,

                firmwareVersion: device.firmwareVersion,

                registrationDate: device.registrationDate,

                lastActivity: device.lastActivity,

                isActive: device.isActive

            }

        });


    } catch (error) {

        logger.error('Get device error:', error);

        res.status(500).json({

            success: false,

            error: 'Failed to retrieve device information'

        });

    }

});


// List all active devices

router.get('/', async (req, res) => {

    try {

        const { page = 1, limit = 50, type } = req.query;

        

        const query = { isActive: true };

        if (type) {

            query.deviceType = type;

        }


        const devices = await Device.find(query)

            .select('deviceId deviceType location sensorModel lastActivity')

            .limit(limit * 1)

            .skip((page - 1) * limit)

            .sort({ lastActivity: -1 });


        const totalDevices = await Device.countDocuments(query);


        res.json({

            success: true,

            devices: devices,

            pagination: {

                currentPage: parseInt(page),

                totalPages: Math.ceil(totalDevices / limit),

                totalDevices: totalDevices,

                hasNextPage: page * limit < totalDevices,

                hasPrevPage: page > 1

            }

        });


    } catch (error) {

        logger.error('List devices error:', error);

        res.status(500).json({

            success: false,

            error: 'Failed to retrieve device list'

        });

    }

});


// Update device location

router.put('/:deviceId/location', async (req, res) => {

    try {

        const { deviceId } = req.params;

        const { latitude, longitude } = req.body;


        if (!latitude || !longitude) {

            return res.status(400).json({

                success: false,

                error: 'Latitude and longitude are required'

            });

        }


        const device = await Device.findOne({ deviceId: deviceId });

        if (!device) {

            return res.status(404).json({

                success: false,

                error: 'Device not found'

            });

        }


        device.location.coordinates = [longitude, latitude];

        await device.save();


        logger.info(`Device ${deviceId} location updated`);

        res.json({

            success: true,

            message: 'Device location updated successfully'

        });


    } catch (error) {

        logger.error('Update device location error:', error);

        res.status(500).json({

            success: false,

            error: 'Failed to update device location'

        });

    }

});


// Deactivate device

router.delete('/:deviceId', async (req, res) => {

    try {

        const { deviceId } = req.params;

        

        const device = await Device.findOne({ deviceId: deviceId });

        if (!device) {

            return res.status(404).json({

                success: false,

                error: 'Device not found'

            });

        }


        await device.deactivate();

        

        logger.info(`Device ${deviceId} deactivated`);

        res.json({

            success: true,

            message: 'Device deactivated successfully'

        });


    } catch (error) {

        logger.error('Deactivate device error:', error);

        res.status(500).json({

            success: false,

            error: 'Failed to deactivate device'

        });

    }

});


module.exports = router;


Sensor Data Ingestion API


The data ingestion system receives sensor measurements from ESP32 devices, validates the data, and stores it in the database with proper indexing for efficient retrieval.


Javascript


// routes/data.js - Sensor data ingestion and retrieval routes

const express = require('express');

const SensorData = require('../models/SensorData');

const Device = require('../models/Device');

const { logger } = require('../utils/logger');

const { validateSensorData } = require('../middleware/validation');

const { calculateAirQualityIndex } = require('../utils/airQuality');


const router = express.Router();


// Receive sensor data from devices

router.post('/sensor-data', validateSensorData, async (req, res) => {

    try {

        const {

            device_id,

            timestamp,

            air_quality_ppm,

            latitude,

            longitude,

            sensor_type,

            firmware_version,

            temperature,

            humidity

        } = req.body;


        // Verify device exists and is active

        const device = await Device.findOne({ 

            deviceId: device_id, 

            isActive: true 

        });

        

        if (!device) {

            return res.status(404).json({

                success: false,

                error: 'Device not found or inactive'

            });

        }


        // Update device last activity

        await device.updateActivity();


        // Create sensor data record

        const sensorData = new SensorData({

            deviceId: device_id,

            timestamp: timestamp ? new Date(timestamp) : new Date(),

            location: {

                type: 'Point',

                coordinates: [longitude, latitude]

            },

            measurements: {

                airQualityPPM: air_quality_ppm,

                temperature: temperature,

                humidity: humidity

            },

            sensorType: sensor_type || 'MQ135',

            firmwareVersion: firmware_version,

            dataQuality: calculateDataQuality(air_quality_ppm, temperature, humidity)

        });


        await sensorData.save();


        logger.info(`Sensor data received from device ${device_id}: ${air_quality_ppm} PPM`);

        

        res.status(201).json({

            success: true,

            message: 'Sensor data received successfully',

            dataId: sensorData._id,

            airQualityIndex: calculateAirQualityIndex(air_quality_ppm)

        });


    } catch (error) {

        logger.error('Sensor data ingestion error:', error);

        res.status(500).json({

            success: false,

            error: 'Failed to process sensor data',

            message: error.message

        });

    }

});


// Get aggregated data for a geographical area

router.get('/area-average', async (req, res) => {

    try {

        const {

            latitude,

            longitude,

            radius = 1000, // Default 1km radius

            start_time,

            end_time,

            interval = '1h'

        } = req.query;


        if (!latitude || !longitude) {

            return res.status(400).json({

                success: false,

                error: 'Latitude and longitude are required'

            });

        }


        const lat = parseFloat(latitude);

        const lng = parseFloat(longitude);

        const radiusMeters = parseInt(radius);


        // Default time range: last 24 hours

        const endTime = end_time ? new Date(end_time) : new Date();

        const startTime = start_time ? new Date(start_time) : 

                         new Date(endTime.getTime() - 24 * 60 * 60 * 1000);


        // Get average data for the area

        const averageData = await SensorData.getAverageInRadius(

            lng, lat, radiusMeters, startTime, endTime

        );


        if (!averageData) {

            return res.json({

                success: true,

                message: 'No data available for the specified area and time range',

                data: null

            });

        }


        // Calculate air quality index and health recommendations

        const airQualityIndex = calculateAirQualityIndex(averageData.averageAirQuality);

        const healthRecommendation = getHealthRecommendation(airQualityIndex);


        res.json({

            success: true,

            data: {

                location: {

                    latitude: lat,

                    longitude: lng,

                    radius: radiusMeters

                },

                timeRange: {

                    startTime: startTime,

                    endTime: endTime

                },

                measurements: {

                    averageAirQuality: averageData.averageAirQuality,

                    averageTemperature: averageData.averageTemperature,

                    averageHumidity: averageData.averageHumidity,

                    airQualityIndex: airQualityIndex,

                    healthRecommendation: healthRecommendation

                },

                statistics: {

                    sampleCount: averageData.sampleCount,

                    deviceCount: averageData.deviceCount

                }

            }

        });


    } catch (error) {

        logger.error('Area average calculation error:', error);

        res.status(500).json({

            success: false,

            error: 'Failed to calculate area average'

        });

    }

});


// Get time series data for a specific device

router.get('/device/:deviceId/timeseries', async (req, res) => {

    try {

        const { deviceId } = req.params;

        const {

            start_time,

            end_time,

            interval = '1h'

        } = req.query;


        // Verify device exists

        const device = await Device.findOne({ deviceId: deviceId });

        if (!device) {

            return res.status(404).json({

                success: false,

                error: 'Device not found'

            });

        }


        // Default time range: last 7 days

        const endTime = end_time ? new Date(end_time) : new Date();

        const startTime = start_time ? new Date(start_time) : 

                         new Date(endTime.getTime() - 7 * 24 * 60 * 60 * 1000);


        const timeSeriesData = await SensorData.getTimeSeriesData(

            deviceId, startTime, endTime, interval

        );


        res.json({

            success: true,

            deviceId: deviceId,

            timeRange: {

                startTime: startTime,

                endTime: endTime,

                interval: interval

            },

            data: timeSeriesData

        });


    } catch (error) {

        logger.error('Time series data error:', error);

        res.status(500).json({

            success: false,

            error: 'Failed to retrieve time series data'

        });

    }

});


// Helper functions

function calculateDataQuality(airQuality, temperature, humidity) {

    let qualityScore = 100;

    

    // Penalize extreme or unrealistic values

    if (airQuality < 0 || airQuality > 1000) qualityScore -= 50;

    if (temperature && (temperature < -40 || temperature > 60)) qualityScore -= 30;

    if (humidity && (humidity < 0 || humidity > 100)) qualityScore -= 30;

    

    if (qualityScore >= 90) return 'excellent';

    if (qualityScore >= 70) return 'good';

    if (qualityScore >= 50) return 'fair';

    return 'poor';

}


function getHealthRecommendation(airQualityIndex) {

    if (airQualityIndex <= 50) {

        return 'Air quality is good. Ideal for outdoor activities.';

    } else if (airQualityIndex <= 100) {

        return 'Air quality is moderate. Sensitive individuals should consider limiting outdoor activities.';

    } else if (airQualityIndex <= 150) {

        return 'Air quality is unhealthy for sensitive groups. Limit outdoor activities if you are sensitive to air pollution.';

    } else if (airQualityIndex <= 200) {

        return 'Air quality is unhealthy. Everyone should limit outdoor activities.';

    } else {

        return 'Air quality is very unhealthy. Avoid outdoor activities.';

    }

}


module.exports = router;


Artificial Intelligence Data Analysis System


The AI analysis system processes collected sensor data to provide intelligent insights, trend analysis, and predictive capabilities. The system implements machine learning algorithms for pattern recognition and anomaly detection.


Javascript


// routes/analytics.js - AI-powered data analysis routes

const express = require('express');

const SensorData = require('../models/SensorData');

const Device = require('../models/Device');

const { logger } = require('../utils/logger');

const { performAIAnalysis } = require('../services/aiAnalysis');


const router = express.Router();


// AI-powered area analysis

router.get('/ai-analysis', async (req, res) => {

    try {

        const {

            latitude,

            longitude,

            radius = 2000,

            analysis_type = 'comprehensive'

        } = req.query;


        if (!latitude || !longitude) {

            return res.status(400).json({

                success: false,

                error: 'Latitude and longitude are required'

            });

        }


        const lat = parseFloat(latitude);

        const lng = parseFloat(longitude);

        const radiusMeters = parseInt(radius);


        // Get recent data for AI analysis

        const endTime = new Date();

        const startTime = new Date(endTime.getTime() - 7 * 24 * 60 * 60 * 1000); // Last 7 days


        // Retrieve raw sensor data for analysis

        const sensorData = await SensorData.find({

            location: {

                $geoWithin: {

                    $centerSphere: [[lng, lat], radiusMeters / 6378100]

                }

            },

            timestamp: {

                $gte: startTime,

                $lte: endTime

            }

        }).sort({ timestamp: -1 }).limit(1000);


        if (sensorData.length === 0) {

            return res.json({

                success: true,

                message: 'Insufficient data for AI analysis',

                analysis: null

            });

        }


        // Perform AI analysis

        const aiAnalysis = await performAIAnalysis(sensorData, {

            analysisType: analysis_type,

            location: { latitude: lat, longitude: lng },

            radius: radiusMeters

        });


        res.json({

            success: true,

            analysis: aiAnalysis,

            dataPoints: sensorData.length,

            analysisTimestamp: new Date()

        });


    } catch (error) {

        logger.error('AI analysis error:', error);

        res.status(500).json({

            success: false,

            error: 'Failed to perform AI analysis'

        });

    }

});


// Trend analysis for specific location

router.get('/trends', async (req, res) => {

    try {

        const {

            latitude,

            longitude,

            radius = 1000,

            period = '30d'

        } = req.query;


        if (!latitude || !longitude) {

            return res.status(400).json({

                success: false,

                error: 'Latitude and longitude are required'

            });

        }


        const lat = parseFloat(latitude);

        const lng = parseFloat(longitude);

        const radiusMeters = parseInt(radius);


        // Calculate time range based on period

        const endTime = new Date();

        let startTime;

        switch (period) {

            case '7d':

                startTime = new Date(endTime.getTime() - 7 * 24 * 60 * 60 * 1000);

                break;

            case '30d':

                startTime = new Date(endTime.getTime() - 30 * 24 * 60 * 60 * 1000);

                break;

            case '90d':

                startTime = new Date(endTime.getTime() - 90 * 24 * 60 * 60 * 1000);

                break;

            default:

                startTime = new Date(endTime.getTime() - 30 * 24 * 60 * 60 * 1000);

        }


        // Aggregate data by day for trend analysis

        const trendData = await SensorData.aggregate([

            {

                $match: {

                    location: {

                        $geoWithin: {

                            $centerSphere: [[lng, lat], radiusMeters / 6378100]

                        }

                    },

                    timestamp: {

                        $gte: startTime,

                        $lte: endTime

                    }

                }

            },

            {

                $group: {

                    _id: {

                        $dateToString: { format: '%Y-%m-%d', date: '$timestamp' }

                    },

                    averageAirQuality: { $avg: '$measurements.airQualityPPM' },

                    minAirQuality: { $min: '$measurements.airQualityPPM' },

                    maxAirQuality: { $max: '$measurements.airQualityPPM' },

                    sampleCount: { $sum: 1 }

                }

            },

            {

                $sort: { _id: 1 }

            }

        ]);


        // Calculate trend statistics

        const trendAnalysis = calculateTrendStatistics(trendData);


        res.json({

            success: true,

            location: {

                latitude: lat,

                longitude: lng,

                radius: radiusMeters

            },

            period: period,

            trendData: trendData,

            trendAnalysis: trendAnalysis

        });


    } catch (error) {

        logger.error('Trend analysis error:', error);

        res.status(500).json({

            success: false,

            error: 'Failed to perform trend analysis'

        });

    }

});


// Anomaly detection

router.get('/anomalies', async (req, res) => {

    try {

        const {

            latitude,

            longitude,

            radius = 1000,

            sensitivity = 'medium'

        } = req.query;


        if (!latitude || !longitude) {

            return res.status(400).json({

                success: false,

                error: 'Latitude and longitude are required'

            });

        }


        const lat = parseFloat(latitude);

        const lng = parseFloat(longitude);

        const radiusMeters = parseInt(radius);


        // Get recent data for anomaly detection

        const endTime = new Date();

        const startTime = new Date(endTime.getTime() - 24 * 60 * 60 * 1000); // Last 24 hours


        const recentData = await SensorData.find({

            location: {

                $geoWithin: {

                    $centerSphere: [[lng, lat], radiusMeters / 6378100]

                }

            },

            timestamp: {

                $gte: startTime,

                $lte: endTime

            }

        }).sort({ timestamp: -1 });


        // Detect anomalies using statistical methods

        const anomalies = detectAnomalies(recentData, sensitivity);


        res.json({

            success: true,

            location: {

                latitude: lat,

                longitude: lng,

                radius: radiusMeters

            },

            timeRange: {

                startTime: startTime,

                endTime: endTime

            },

            anomalies: anomalies,

            sensitivity: sensitivity

        });


    } catch (error) {

        logger.error('Anomaly detection error:', error);

        res.status(500).json({

            success: false,

            error: 'Failed to detect anomalies'

        });

    }

});


// Helper functions for analysis

function calculateTrendStatistics(trendData) {

    if (trendData.length < 2) {

        return {

            trend: 'insufficient_data',

            changeRate: 0,

            volatility: 0

        };

    }


    const values = trendData.map(d => d.averageAirQuality);

    const firstValue = values[0];

    const lastValue = values[values.length - 1];

    

    // Calculate overall trend

    const changeRate = ((lastValue - firstValue) / firstValue) * 100;

    

    // Calculate volatility (standard deviation)

    const mean = values.reduce((sum, val) => sum + val, 0) / values.length;

    const variance = values.reduce((sum, val) => sum + Math.pow(val - mean, 2), 0) / values.length;

    const volatility = Math.sqrt(variance);

    

    let trend;

    if (Math.abs(changeRate) < 5) {

        trend = 'stable';

    } else if (changeRate > 0) {

        trend = 'increasing';

    } else {

        trend = 'decreasing';

    }

    

    return {

        trend: trend,

        changeRate: Math.round(changeRate * 100) / 100,

        volatility: Math.round(volatility * 100) / 100,

        mean: Math.round(mean * 100) / 100

    };

}


function detectAnomalies(data, sensitivity) {

    if (data.length < 10) {

        return [];

    }


    const values = data.map(d => d.measurements.airQualityPPM);

    const mean = values.reduce((sum, val) => sum + val, 0) / values.length;

    const variance = values.reduce((sum, val) => sum + Math.pow(val - mean, 2), 0) / values.length;

    const stdDev = Math.sqrt(variance);

    

    // Set threshold based on sensitivity

    let threshold;

    switch (sensitivity) {

        case 'low':

            threshold = 3;

            break;

        case 'medium':

            threshold = 2.5;

            break;

        case 'high':

            threshold = 2;

            break;

        default:

            threshold = 2.5;

    }

    

    const anomalies = [];

    

    data.forEach((point, index) => {

        const value = point.measurements.airQualityPPM;

        const zScore = Math.abs((value - mean) / stdDev);

        

        if (zScore > threshold) {

            anomalies.push({

                timestamp: point.timestamp,

                deviceId: point.deviceId,

                value: value,

                zScore: Math.round(zScore * 100) / 100,

                severity: zScore > 3 ? 'high' : 'medium',

                location: point.location

            });

        }

    });

    

    return anomalies.sort((a, b) => new Date(b.timestamp) - new Date(a.timestamp));

}


module.exports = router;


AI Analysis Service Implementation


The AI analysis service implements machine learning algorithms for pattern recognition, prediction, and intelligent data interpretation.


Javascript


// services/aiAnalysis.js - AI analysis implementation

const tf = require('@tensorflow/tfjs-node');

const { logger } = require('../utils/logger');


class AIAnalysisService {

    constructor() {

        this.model = null;

        this.isModelLoaded = false;

    }


    async initializeModel() {

        try {

            // In a production environment, load a pre-trained model

            // For this example, we'll create a simple neural network

            this.model = tf.sequential({

                layers: [

                    tf.layers.dense({ inputShape: [4], units: 16, activation: 'relu' }),

                    tf.layers.dense({ units: 8, activation: 'relu' }),

                    tf.layers.dense({ units: 1, activation: 'linear' })

                ]

            });


            this.model.compile({

                optimizer: 'adam',

                loss: 'meanSquaredError',

                metrics: ['mae']

            });


            this.isModelLoaded = true;

            logger.info('AI model initialized successfully');

        } catch (error) {

            logger.error('Failed to initialize AI model:', error);

        }

    }


    async performAIAnalysis(sensorData, options = {}) {

        try {

            if (!this.isModelLoaded) {

                await this.initializeModel();

            }


            const analysis = {

                summary: await this.generateSummary(sensorData),

                patterns: await this.detectPatterns(sensorData),

                predictions: await this.generatePredictions(sensorData),

                recommendations: await this.generateRecommendations(sensorData),

                riskAssessment: await this.assessRisk(sensorData)

            };


            return analysis;

        } catch (error) {

            logger.error('AI analysis failed:', error);

            throw error;

        }

    }


    async generateSummary(sensorData) {

        const values = sensorData.map(d => d.measurements.airQualityPPM);

        const timestamps = sensorData.map(d => new Date(d.timestamp));

        

        const summary = {

            totalDataPoints: values.length,

            timeSpan: {

                start: Math.min(...timestamps),

                end: Math.max(...timestamps)

            },

            statistics: {

                mean: this.calculateMean(values),

                median: this.calculateMedian(values),

                standardDeviation: this.calculateStandardDeviation(values),

                minimum: Math.min(...values),

                maximum: Math.max(...values)

            },

            airQualityLevel: this.categorizeAirQuality(this.calculateMean(values))

        };


        return summary;

    }


    async detectPatterns(sensorData) {

        const patterns = {

            dailyPattern: await this.analyzeDailyPattern(sensorData),

            weeklyPattern: await this.analyzeWeeklyPattern(sensorData),

            seasonalTrend: await this.analyzeSeasonalTrend(sensorData),

            correlations: await this.analyzeCorrelations(sensorData)

        };


        return patterns;

    }


    async analyzeDailyPattern(sensorData) {

        const hourlyData = {};

        

        sensorData.forEach(point => {

            const hour = new Date(point.timestamp).getHours();

            if (!hourlyData[hour]) {

                hourlyData[hour] = [];

            }

            hourlyData[hour].push(point.measurements.airQualityPPM);

        });


        const hourlyAverages = {};

        for (let hour = 0; hour < 24; hour++) {

            if (hourlyData[hour] && hourlyData[hour].length > 0) {

                hourlyAverages[hour] = this.calculateMean(hourlyData[hour]);

            } else {

                hourlyAverages[hour] = null;

            }

        }


        // Identify peak pollution hours

        const validHours = Object.entries(hourlyAverages)

            .filter(([hour, avg]) => avg !== null)

            .map(([hour, avg]) => ({ hour: parseInt(hour), average: avg }));


        const peakHour = validHours.reduce((max, current) => 

            current.average > max.average ? current : max, validHours[0]);


        const lowHour = validHours.reduce((min, current) => 

            current.average < min.average ? current : min, validHours[0]);


        return {

            hourlyAverages: hourlyAverages,

            peakPollutionHour: peakHour,

            lowestPollutionHour: lowHour,

            dailyVariation: peakHour.average - lowHour.average

        };

    }


    async analyzeWeeklyPattern(sensorData) {

        const weeklyData = {};

        const dayNames = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday'];

        

        sensorData.forEach(point => {

            const dayOfWeek = new Date(point.timestamp).getDay();

            if (!weeklyData[dayOfWeek]) {

                weeklyData[dayOfWeek] = [];

            }

            weeklyData[dayOfWeek].push(point.measurements.airQualityPPM);

        });


        const weeklyAverages = {};

        for (let day = 0; day < 7; day++) {

            if (weeklyData[day] && weeklyData[day].length > 0) {

                weeklyAverages[dayNames[day]] = this.calculateMean(weeklyData[day]);

            }

        }


        return {

            weeklyAverages: weeklyAverages,

            weekendVsWeekday: this.compareWeekendVsWeekday(weeklyData)

        };

    }


    async analyzeSeasonalTrend(sensorData) {

        // Group data by month

        const monthlyData = {};

        

        sensorData.forEach(point => {

            const month = new Date(point.timestamp).getMonth();

            if (!monthlyData[month]) {

                monthlyData[month] = [];

            }

            monthlyData[month].push(point.measurements.airQualityPPM);

        });


        const monthlyAverages = {};

        const monthNames = [

            'January', 'February', 'March', 'April', 'May', 'June',

            'July', 'August', 'September', 'October', 'November', 'December'

        ];


        for (let month = 0; month < 12; month++) {

            if (monthlyData[month] && monthlyData[month].length > 0) {

                monthlyAverages[monthNames[month]] = this.calculateMean(monthlyData[month]);

            }

        }


        return {

            monthlyAverages: monthlyAverages,

            seasonalVariation: this.calculateSeasonalVariation(monthlyAverages)

        };

    }


    async analyzeCorrelations(sensorData) {

        // Analyze correlations between air quality and other factors

        const correlations = {};

        

        if (sensorData.some(d => d.measurements.temperature !== undefined)) {

            correlations.temperatureCorrelation = this.calculateCorrelation(

                sensorData.map(d => d.measurements.airQualityPPM),

                sensorData.map(d => d.measurements.temperature).filter(t => t !== undefined)

            );

        }


        if (sensorData.some(d => d.measurements.humidity !== undefined)) {

            correlations.humidityCorrelation = this.calculateCorrelation(

                sensorData.map(d => d.measurements.airQualityPPM),

                sensorData.map(d => d.measurements.humidity).filter(h => h !== undefined)

            );

        }


        return correlations;

    }


    async generatePredictions(sensorData) {

        if (sensorData.length < 10) {

            return {

                error: 'Insufficient data for predictions',

                predictions: null

            };

        }


        // Simple time series prediction using moving average

        const values = sensorData.map(d => d.measurements.airQualityPPM);

        const recentValues = values.slice(-10); // Last 10 values

        const movingAverage = this.calculateMean(recentValues);

        

        // Calculate trend

        const firstHalf = recentValues.slice(0, 5);

        const secondHalf = recentValues.slice(5);

        const trend = this.calculateMean(secondHalf) - this.calculateMean(firstHalf);


        const predictions = [];

        for (let i = 1; i <= 6; i++) { // Predict next 6 hours

            const predictedValue = movingAverage + (trend * i);

            predictions.push({

                hoursAhead: i,

                predictedValue: Math.max(0, predictedValue),

                confidence: Math.max(0.1, 1 - (i * 0.15)) // Decreasing confidence

            });

        }


        return {

            method: 'moving_average_with_trend',

            predictions: predictions,

            baseValue: movingAverage,

            trend: trend

        };

    }


    async generateRecommendations(sensorData) {

        const currentLevel = this.calculateMean(

            sensorData.slice(-5).map(d => d.measurements.airQualityPPM)

        );

        

        const recommendations = [];


        if (currentLevel > 150) {

            recommendations.push({

                priority: 'high',

                category: 'health',

                message: 'Air quality is unhealthy. Avoid outdoor activities and keep windows closed.'

            });

            recommendations.push({

                priority: 'high',

                category: 'action',

                message: 'Consider using air purifiers indoors and wearing masks when going outside.'

            });

        } else if (currentLevel > 100) {

            recommendations.push({

                priority: 'medium',

                category: 'health',

                message: 'Air quality is moderate. Sensitive individuals should limit outdoor activities.'

            });

        } else {

            recommendations.push({

                priority: 'low',

                category: 'health',

                message: 'Air quality is good. Safe for all outdoor activities.'

            });

        }


        // Add monitoring recommendations

        if (sensorData.length < 50) {

            recommendations.push({

                priority: 'medium',

                category: 'monitoring',

                message: 'More data collection recommended for better analysis accuracy.'

            });

        }


        return recommendations;

    }


    async assessRisk(sensorData) {

        const values = sensorData.map(d => d.measurements.airQualityPPM);

        const currentLevel = this.calculateMean(values.slice(-5));

        const maxLevel = Math.max(...values);

        const volatility = this.calculateStandardDeviation(values);


        let riskLevel;

        let riskScore = 0;


        // Base risk on current level

        if (currentLevel > 200) riskScore += 40;

        else if (currentLevel > 150) riskScore += 30;

        else if (currentLevel > 100) riskScore += 20;

        else if (currentLevel > 50) riskScore += 10;


        // Add risk based on maximum observed level

        if (maxLevel > 300) riskScore += 30;

        else if (maxLevel > 200) riskScore += 20;

        else if (maxLevel > 150) riskScore += 10;


        // Add risk based on volatility

        if (volatility > 50) riskScore += 20;

        else if (volatility > 30) riskScore += 10;


        // Determine risk level

        if (riskScore >= 70) riskLevel = 'very_high';

        else if (riskScore >= 50) riskLevel = 'high';

        else if (riskScore >= 30) riskLevel = 'medium';

        else if (riskScore >= 15) riskLevel = 'low';

        else riskLevel = 'very_low';


        return {

            riskLevel: riskLevel,

            riskScore: riskScore,

            factors: {

                currentLevel: currentLevel,

                maxLevel: maxLevel,

                volatility: volatility

            }

        };

    }


    // Utility methods

    calculateMean(values) {

        return values.reduce((sum, val) => sum + val, 0) / values.length;

    }


    calculateMedian(values) {

        const sorted = values.slice().sort((a, b) => a - b);

        const middle = Math.floor(sorted.length / 2);

        

        if (sorted.length % 2 === 0) {

            return (sorted[middle - 1] + sorted[middle]) / 2;

        } else {

            return sorted[middle];

        }

    }


    calculateStandardDeviation(values) {

        const mean = this.calculateMean(values);

        const variance = values.reduce((sum, val) => sum + Math.pow(val - mean, 2), 0) / values.length;

        return Math.sqrt(variance);

    }


    calculateCorrelation(x, y) {

        if (x.length !== y.length || x.length === 0) return 0;

        

        const meanX = this.calculateMean(x);

        const meanY = this.calculateMean(y);

        

        let numerator = 0;

        let denomX = 0;

        let denomY = 0;

        

        for (let i = 0; i < x.length; i++) {

            const deltaX = x[i] - meanX;

            const deltaY = y[i] - meanY;

            

            numerator += deltaX * deltaY;

            denomX += deltaX * deltaX;

            denomY += deltaY * deltaY;

        }

        

        const denominator = Math.sqrt(denomX * denomY);

        return denominator === 0 ? 0 : numerator / denominator;

    }


    categorizeAirQuality(ppm) {

        if (ppm <= 50) return 'Good';

        if (ppm <= 100) return 'Moderate';

        if (ppm <= 150) return 'Unhealthy for Sensitive Groups';

        if (ppm <= 200) return 'Unhealthy';

        if (ppm <= 300) return 'Very Unhealthy';

        return 'Hazardous';

    }


    compareWeekendVsWeekday(weeklyData) {

        const weekdayData = [];

        const weekendData = [];

        

        for (let day = 0; day < 7; day++) {

            if (weeklyData[day]) {

                if (day === 0 || day === 6) { // Sunday or Saturday

                    weekendData.push(...weeklyData[day]);

                } else {

                    weekdayData.push(...weeklyData[day]);

                }

            }

        }

        

        return {

            weekdayAverage: weekdayData.length > 0 ? this.calculateMean(weekdayData) : null,

            weekendAverage: weekendData.length > 0 ? this.calculateMean(weekendData) : null,

            difference: weekdayData.length > 0 && weekendData.length > 0 ? 

                       this.calculateMean(weekdayData) - this.calculateMean(weekendData) : null

        };

    }


    calculateSeasonalVariation(monthlyAverages) {

        const values = Object.values(monthlyAverages).filter(v => v !== undefined);

        if (values.length === 0) return null;

        

        return {

            highestMonth: Object.entries(monthlyAverages).reduce((max, [month, avg]) => 

                avg > max.average ? { month, average: avg } : max, 

                { month: '', average: -Infinity }),

            lowestMonth: Object.entries(monthlyAverages).reduce((min, [month, avg]) => 

                avg < min.average ? { month, average: avg } : min, 

                { month: '', average: Infinity }),

            seasonalRange: Math.max(...values) - Math.min(...values)

        };

    }

}


// Export singleton instance

const aiAnalysisService = new AIAnalysisService();


async function performAIAnalysis(sensorData, options) {

    return await aiAnalysisService.performAIAnalysis(sensorData, options);

}


module.exports = {

    performAIAnalysis,

    AIAnalysisService

};


System Configuration and Deployment


Database Configuration


The database configuration establishes connection parameters, indexing strategies, and performance optimization settings for MongoDB.


Javascript


// config/database.js - Database configuration

const mongoose = require('mongoose');

const { logger } = require('../utils/logger');


const connectDatabase = async () => {

    try {

        const mongoURI = process.env.MONGODB_URI || 'mongodb://localhost:27017/air_quality_monitoring';

        

        const options = {

            useNewUrlParser: true,

            useUnifiedTopology: true,

            maxPoolSize: 10,

            serverSelectionTimeoutMS: 5000,

            socketTimeoutMS: 45000,

            family: 4,

            bufferCommands: false,

            bufferMaxEntries: 0

        };


        await mongoose.connect(mongoURI, options);

        

        // Set up connection event handlers

        mongoose.connection.on('connected', () => {

            logger.info('MongoDB connected successfully');

        });


        mongoose.connection.on('error', (error) => {

            logger.error('MongoDB connection error:', error);

        });


        mongoose.connection.on('disconnected', () => {

            logger.warn('MongoDB disconnected');

        });


        // Graceful shutdown

        process.on('SIGINT', async () => {

            await mongoose.connection.close();

            logger.info('MongoDB connection closed through app termination');

            process.exit(0);

        });


    } catch (error) {

        logger.error('Database connection failed:', error);

        throw error;

    }

};


module.exports = { connectDatabase };


Validation Middleware


The validation middleware ensures data integrity and security by validating incoming requests from sensor devices and user queries.


Javascript


// middleware/validation.js - Request validation middleware

const { body, query, validationResult } = require('express-validator');

const { logger } = require('../utils/logger');


// Device registration validation

const validateDeviceRegistration = [

    body('device_id')

        .isLength({ min: 8, max: 64 })

        .matches(/^[A-Za-z0-9]+$/)

        .withMessage('Device ID must be 8-64 alphanumeric characters'),

    

    body('device_type')

        .optional()

        .isIn(['air_quality_sensor', 'weather_station', 'noise_monitor'])

        .withMessage('Invalid device type'),

    

    body('latitude')

        .isFloat({ min: -90, max: 90 })

        .withMessage('Latitude must be between -90 and 90'),

    

    body('longitude')

        .isFloat({ min: -180, max: 180 })

        .withMessage('Longitude must be between -180 and 180'),

    

    body('sensor_model')

        .isLength({ min: 2, max: 50 })

        .withMessage('Sensor model must be 2-50 characters'),

    

    body('firmware_version')

        .matches(/^\d+\.\d+\.\d+$/)

        .withMessage('Firmware version must be in format x.y.z'),

    

    (req, res, next) => {

        const errors = validationResult(req);

        if (!errors.isEmpty()) {

            logger.warn('Device registration validation failed:', errors.array());

            return res.status(400).json({

                success: false,

                error: 'Validation failed',

                details: errors.array()

            });

        }

        next();

    }

];


// Sensor data validation

const validateSensorData = [

    body('device_id')

        .isLength({ min: 8, max: 64 })

        .matches(/^[A-Za-z0-9]+$/)

        .withMessage('Invalid device ID'),

    

    body('air_quality_ppm')

        .isFloat({ min: 0, max: 10000 })

        .withMessage('Air quality PPM must be between 0 and 10000'),

    

    body('latitude')

        .isFloat({ min: -90, max: 90 })

        .withMessage('Invalid latitude'),

    

    body('longitude')

        .isFloat({ min: -180, max: 180 })

        .withMessage('Invalid longitude'),

    

    body('timestamp')

        .optional()

        .isISO8601()

        .withMessage('Invalid timestamp format'),

    

    body('temperature')

        .optional()

        .isFloat({ min: -50, max: 100 })

        .withMessage('Temperature must be between -50 and 100 Celsius'),

    

    body('humidity')

        .optional()

        .isFloat({ min: 0, max: 100 })

        .withMessage('Humidity must be between 0 and 100 percent'),

    

    body('sensor_type')

        .optional()

        .isIn(['MQ135', 'MQ7', 'BME280', 'DHT22'])

        .withMessage('Invalid sensor type'),

    

    (req, res, next) => {

        const errors = validationResult(req);

        if (!errors.isEmpty()) {

            logger.warn('Sensor data validation failed:', errors.array());

            return res.status(400).json({

                success: false,

                error: 'Invalid sensor data',

                details: errors.array()

            });

        }

        next();

    }

];


module.exports = {

    validateDeviceRegistration,

    validateSensorData

};


Utility Functions


The utility functions provide common functionality for air quality calculations, logging, and system operations.


Javascript


// utils/airQuality.js - Air quality calculation utilities

function calculateAirQualityIndex(ppm) {

    // Convert PPM to AQI using EPA standards (simplified)

    if (ppm <= 12) return Math.round((50 / 12) * ppm);

    if (ppm <= 35.4) return Math.round(((100 - 51) / (35.4 - 12.1)) * (ppm - 12.1) + 51);

    if (ppm <= 55.4) return Math.round(((150 - 101) / (55.4 - 35.5)) * (ppm - 35.5) + 101);

    if (ppm <= 150.4) return Math.round(((200 - 151) / (150.4 - 55.5)) * (ppm - 55.5) + 151);

    if (ppm <= 250.4) return Math.round(((300 - 201) / (250.4 - 150.5)) * (ppm - 150.5) + 201);

    return Math.round(((500 - 301) / (500.4 - 250.5)) * (ppm - 250.5) + 301);

}


function getAirQualityCategory(aqi) {

    if (aqi <= 50) return 'Good';

    if (aqi <= 100) return 'Moderate';

    if (aqi <= 150) return 'Unhealthy for Sensitive Groups';

    if (aqi <= 200) return 'Unhealthy';

    if (aqi <= 300) return 'Very Unhealthy';

    return 'Hazardous';

}


function getHealthRecommendation(aqi) {

    if (aqi <= 50) {

        return 'Air quality is satisfactory, and air pollution poses little or no risk.';

    } else if (aqi <= 100) {

        return 'Air quality is acceptable. However, there may be a risk for some people, particularly those who are unusually sensitive to air pollution.';

    } else if (aqi <= 150) {

        return 'Members of sensitive groups may experience health effects. The general public is less likely to be affected.';

    } else if (aqi <= 200) {

        return 'Some members of the general public may experience health effects; members of sensitive groups may experience more serious health effects.';

    } else if (aqi <= 300) {

        return 'Health alert: The risk of health effects is increased for everyone.';

    } else {

        return 'Health warning of emergency conditions: everyone is more likely to be affected.';

    }

}


module.exports = {

    calculateAirQualityIndex,

    getAirQualityCategory,

    getHealthRecommendation

};



Javascript


// utils/logger.js - Logging utility

const winston = require('winston');


const logger = winston.createLogger({

    level: process.env.LOG_LEVEL || 'info',

    format: winston.format.combine(

        winston.format.timestamp(),

        winston.format.errors({ stack: true }),

        winston.format.json()

    ),

    defaultMeta: { service: 'air-quality-server' },

    transports: [

        new winston.transports.File({ 

            filename: 'logs/error.log', 

            level: 'error',

            maxsize: 5242880, // 5MB

            maxFiles: 5

        }),

        new winston.transports.File({ 

            filename: 'logs/combined.log',

            maxsize: 5242880, // 5MB

            maxFiles: 5

        })

    ]

});


if (process.env.NODE_ENV !== 'production') {

    logger.add(new winston.transports.Console({

        format: winston.format.combine(

            winston.format.colorize(),

            winston.format.simple()

        )

    }));

}


module.exports = { logger };



Complete Working Example


This section provides a complete, deployable example of the IoT air pollution monitoring system including all necessary configuration files and deployment scripts.


ESP32 Complete Firmware (C++)


// complete_esp32_firmware.ino - Complete ESP32 air quality sensor firmware

#include <WiFi.h>

#include <HTTPClient.h>

#include <ArduinoJson.h>

#include <EEPROM.h>

#include <esp_system.h>


// Configuration constants - Update these for your deployment

const char* WIFI_SSID = "YourWiFiNetwork";

const char* WIFI_PASSWORD = "YourWiFiPassword";

const char* SERVER_URL = "http://your-server.com/api/data/sensor-data";

const char* REGISTRATION_URL = "http://your-server.com/api/devices/register";


// Hardware configuration

const int SENSOR_PIN = 34;

const int LED_PIN = 2;

const int MEASUREMENT_INTERVAL = 600000; // 10 minutes

const int MAX_RETRY_ATTEMPTS = 3;

const int WIFI_TIMEOUT = 15000;

const int SENSOR_WARMUP_TIME = 30000;


// Device location (in production, use GPS module)

const float DEVICE_LATITUDE = 40.7128;   // New York City example

const float DEVICE_LONGITUDE = -74.0060;


// Global variables

String deviceUUID;

unsigned long lastMeasurementTime = 0;

bool systemInitialized = false;


// Sensor calibration constants

const float RLOAD = 10.0;

const float RZERO = 76.63;

const float PARA = 116.6020682;

const float PARB = 2.769034857;


class SystemManager {

private:

    bool wifiConnected;

    int connectionAttempts;

    

public:

    SystemManager() : wifiConnected(false), connectionAttempts(0) {}

    

    void initialize() {

        Serial.begin(115200);

        Serial.println("\n=== ESP32 Air Quality Sensor System ===");

        Serial.println("Firmware Version: 1.0.0");

        Serial.println("Initializing system components...");

        

        // Initialize hardware

        pinMode(SENSOR_PIN, INPUT);

        pinMode(LED_PIN, OUTPUT);

        digitalWrite(LED_PIN, LOW);

        

        // Initialize EEPROM

        EEPROM.begin(512);

        

        // Generate or load device UUID

        deviceUUID = getOrCreateDeviceUUID();

        Serial.println("Device UUID: " + deviceUUID);

        

        // Initialize WiFi

        if (initializeWiFi()) {

            Serial.println("WiFi initialization successful");

            registerDevice();

            warmupSensor();

            systemInitialized = true;

            digitalWrite(LED_PIN, HIGH); // Indicate ready state

        } else {

            Serial.println("WiFi initialization failed - entering deep sleep");

            enterDeepSleep();

        }

    }

    

    bool initializeWiFi() {

        WiFi.mode(WIFI_STA);

        WiFi.begin(WIFI_SSID, WIFI_PASSWORD);

        

        Serial.print("Connecting to WiFi");

        unsigned long startTime = millis();

        

        while (WiFi.status() != WL_CONNECTED && 

               (millis() - startTime) < WIFI_TIMEOUT) {

            delay(500);

            Serial.print(".");

        }

        

        if (WiFi.status() == WL_CONNECTED) {

            Serial.println("\nWiFi connected successfully");

            Serial.print("IP Address: ");

            Serial.println(WiFi.localIP());

            Serial.print("Signal Strength: ");

            Serial.print(WiFi.RSSI());

            Serial.println(" dBm");

            wifiConnected = true;

            connectionAttempts = 0;

            return true;

        } else {

            Serial.println("\nWiFi connection failed");

            connectionAttempts++;

            wifiConnected = false;

            return false;

        }

    }

    

    bool ensureWiFiConnection() {

        if (WiFi.status() != WL_CONNECTED) {

            Serial.println("WiFi disconnected, attempting reconnection...");

            return initializeWiFi();

        }

        return true;

    }

    

    void registerDevice() {

        Serial.println("Registering device with server...");

        

        HTTPClient http;

        http.begin(REGISTRATION_URL);

        http.addHeader("Content-Type", "application/json");

        http.setTimeout(15000);

        

        DynamicJsonDocument doc(512);

        doc["device_id"] = deviceUUID;

        doc["device_type"] = "air_quality_sensor";

        doc["latitude"] = DEVICE_LATITUDE;

        doc["longitude"] = DEVICE_LONGITUDE;

        doc["sensor_model"] = "MQ135";

        doc["firmware_version"] = "1.0.0";

        

        String jsonString;

        serializeJson(doc, jsonString);

        

        int httpResponseCode = http.POST(jsonString);

        

        if (httpResponseCode >= 200 && httpResponseCode < 300) {

            Serial.println("Device registration successful");

        } else {

            Serial.print("Device registration failed. HTTP Code: ");

            Serial.println(httpResponseCode);

            Serial.println("Continuing with measurements...");

        }

        

        http.end();

    }

    

    void warmupSensor() {

        Serial.println("Warming up air quality sensor...");

        unsigned long startTime = millis();

        

        while ((millis() - startTime) < SENSOR_WARMUP_TIME) {

            unsigned long elapsed = (millis() - startTime) / 1000;

            Serial.print("Sensor warmup: ");

            Serial.print(elapsed);

            Serial.print("/");

            Serial.print(SENSOR_WARMUP_TIME / 1000);

            Serial.println(" seconds");

            

            // Blink LED during warmup

            digitalWrite(LED_PIN, !digitalRead(LED_PIN));

            delay(1000);

        }

        

        digitalWrite(LED_PIN, HIGH);

        Serial.println("Sensor warmup complete");

    }

    

    void enterDeepSleep() {

        Serial.println("Entering deep sleep mode...");

        digitalWrite(LED_PIN, LOW);

        esp_deep_sleep(MEASUREMENT_INTERVAL * 1000ULL);

    }

    

    bool isSystemReady() {

        return systemInitialized && wifiConnected;

    }

    

private:

    String getOrCreateDeviceUUID() {

        String uuid = loadUUIDFromEEPROM();

        if (uuid.length() == 0) {

            uuid = generateUUID();

            saveUUIDToEEPROM(uuid);

            Serial.println("Generated new device UUID");

        } else {

            Serial.println("Loaded existing UUID from EEPROM");

        }

        return uuid;

    }

    

    String generateUUID() {

        uint64_t chipId = ESP.getEfuseMac();

        String uuid = String((uint32_t)(chipId >> 32), HEX) + 

                     String((uint32_t)chipId, HEX) + 

                     String(random(0x10000000, 0xFFFFFFFF), HEX);

        uuid.toUpperCase();

        return uuid;

    }

    

    void saveUUIDToEEPROM(const String& uuid) {

        for (int i = 0; i < uuid.length() && i < 32; i++) {

            EEPROM.write(i, uuid[i]);

        }

        EEPROM.write(uuid.length(), '\0');

        EEPROM.commit();

    }

    

    String loadUUIDFromEEPROM() {

        String uuid = "";

        char ch;

        for (int i = 0; i < 32; i++) {

            ch = EEPROM.read(i);

            if (ch == '\0') break;

            uuid += ch;

        }

        return uuid;

    }

};


class AirQualitySensor {

private:

    int pin;

    bool isWarmedUp;

    

public:

    AirQualitySensor(int sensorPin) : pin(sensorPin), isWarmedUp(true) {}

    

    float readAirQualityPPM() {

        const int numReadings = 10;

        float totalResistance = 0.0;

        

        // Take multiple readings for averaging

        for (int i = 0; i < numReadings; i++) {

            int analogValue = analogRead(pin);

            float voltage = (analogValue / 4095.0) * 3.3;

            float resistance = calculateResistance(voltage);

            

            if (resistance > 0) {

                totalResistance += resistance;

            }

            

            delay(100);

        }

        

        float avgResistance = totalResistance / numReadings;

        float ratio = avgResistance / RZERO;

        float ppm = PARA * pow(ratio, -PARB);

        

        // Sanity check

        if (ppm < 0 || ppm > 1000) {

            Serial.println("Warning: Sensor reading out of expected range");

            return -1.0;

        }

        

        return ppm;

    }

    

private:

    float calculateResistance(float voltage) {

        if (voltage <= 0.0 || voltage >= 3.3) {

            return 0.0;

        }

        return ((3.3 - voltage) / voltage) * RLOAD;

    }

};


class DataTransmitter {

private:

    String serverURL;

    

public:

    DataTransmitter(const String& url) : serverURL(url) {}

    

    bool transmitData(const String& deviceId, float ppm, float lat, float lng) {

        if (ppm < 0) {

            Serial.println("Invalid sensor reading - skipping transmission");

            return false;

        }

        

        HTTPClient http;

        http.begin(serverURL);

        http.addHeader("Content-Type", "application/json");

        http.addHeader("User-Agent", "ESP32-AirQuality/1.0");

        http.setTimeout(20000);

        

        DynamicJsonDocument doc(1024);

        doc["device_id"] = deviceId;

        doc["timestamp"] = getCurrentTimestamp();

        doc["air_quality_ppm"] = ppm;

        doc["latitude"] = lat;

        doc["longitude"] = lng;

        doc["sensor_type"] = "MQ135";

        doc["firmware_version"] = "1.0.0";

        

        String jsonString;

        serializeJson(doc, jsonString);

        

        Serial.println("Transmitting data to server:");

        Serial.println(jsonString);

        

        int httpResponseCode = http.POST(jsonString);

        

        if (httpResponseCode >= 200 && httpResponseCode < 300) {

            String response = http.getString();

            Serial.print("Transmission successful. Response: ");

            Serial.println(response);

            http.end();

            return true;

        } else {

            Serial.print("Transmission failed. HTTP Code: ");

            Serial.println(httpResponseCode);

            if (httpResponseCode > 0) {

                Serial.print("Error response: ");

                Serial.println(http.getString());

            }

            http.end();

            return false;

        }

    }

    

private:

    String getCurrentTimestamp() {

        // In production, sync with NTP server

        return String(millis());

    }

};


// Global objects

SystemManager systemManager;

AirQualitySensor airSensor(SENSOR_PIN);

DataTransmitter dataTransmitter(SERVER_URL);


void setup() {

    // Initialize random seed

    randomSeed(analogRead(0));

    

    // Initialize system

    systemManager.initialize();

    

    // Record first measurement time

    lastMeasurementTime = millis();

    

    Serial.println("=== System Ready ===");

}


void loop() {

    unsigned long currentTime = millis();

    

    // Check if it's time for next measurement

    if (currentTime - lastMeasurementTime >= MEASUREMENT_INTERVAL) {

        

        if (!systemManager.isSystemReady()) {

            Serial.println("System not ready, skipping measurement");

            delay(5000);

            return;

        }

        

        // Ensure WiFi connection

        if (!systemManager.ensureWiFiConnection()) {

            Serial.println("WiFi connection failed, entering deep sleep");

            systemManager.enterDeepSleep();

            return;

        }

        

        // Indicate measurement in progress

        digitalWrite(LED_PIN, LOW);

        

        Serial.println("\n--- Taking Air Quality Measurement ---");

        

        // Read sensor data

        float ppmValue = airSensor.readAirQualityPPM();

        

        if (ppmValue >= 0) {

            Serial.print("Air Quality Reading: ");

            Serial.print(ppmValue, 2);

            Serial.println(" PPM");

            

            // Transmit data

            bool success = dataTransmitter.transmitData(

                deviceUUID, ppmValue, DEVICE_LATITUDE, DEVICE_LONGITUDE);

            

            if (success) {

                Serial.println("Data transmission successful");

                // Quick blink to indicate success

                for (int i = 0; i < 3; i++) {

                    digitalWrite(LED_PIN, HIGH);

                    delay(100);

                    digitalWrite(LED_PIN, LOW);

                    delay(100);

                }

            } else {

                Serial.println("Data transmission failed");

                // Longer blink to indicate failure

                for (int i = 0; i < 5; i++) {

                    digitalWrite(LED_PIN, HIGH);

                    delay(200);

                    digitalWrite(LED_PIN, LOW);

                    delay(200);

                }

            }

        } else {

            Serial.println("Invalid sensor reading - measurement skipped");

        }

        

        // Return LED to ready state

        digitalWrite(LED_PIN, HIGH);

        

        // Update last measurement time

        lastMeasurementTime = currentTime;

        

        Serial.print("Next measurement in ");

        Serial.print(MEASUREMENT_INTERVAL / 60000);

        Serial.println(" minutes");

    }

    

    // Small delay to prevent excessive CPU usage

    delay(1000);

    

    // Watchdog-style check - restart if system becomes unresponsive

    if (millis() - lastMeasurementTime > MEASUREMENT_INTERVAL * 2) {

        Serial.println("System appears unresponsive - restarting");

        ESP.restart();

    }

}



Complete Server Package Configuration


Json

// package.json - Node.js dependencies

{

  "name": "air-quality-monitoring-server",

  "version": "1.0.0",

  "description": "IoT Air Quality Monitoring System Server",

  "main": "server.js",

  "scripts": {

    "start": "node server.js",

    "dev": "nodemon server.js",

    "test": "jest",

    "lint": "eslint .",

    "setup": "node scripts/setup.js"

  },

  "dependencies": {

    "express": "^4.18.2",

    "mongoose": "^7.5.0",

    "cors": "^2.8.5",

    "helmet": "^7.0.0",

    "express-rate-limit": "^6.8.1",

    "compression": "^1.7.4",

    "express-validator": "^7.0.1",

    "winston": "^3.10.0",

    "@tensorflow/tfjs-node": "^4.10.0",

    "dotenv": "^16.3.1"

  },

  "devDependencies": {

    "nodemon": "^3.0.1",

    "jest": "^29.6.2",

    "eslint": "^8.47.0"

  },

  "engines": {

    "node": ">=16.0.0"

  }

}


Bash


#!/bin/bash

# deploy.sh - Deployment script


echo "=== Air Quality Monitoring System Deployment ==="


# Create necessary directories

mkdir -p logs

mkdir -p data

mkdir -p config


# Install dependencies

echo "Installing Node.js dependencies..."

npm install


# Set up environment variables

if [ ! -f .env ]; then

    echo "Creating environment configuration..."

    cat > .env << EOF

NODE_ENV=production

PORT=3000

MONGODB_URI=mongodb://localhost:27017/air_quality_monitoring

LOG_LEVEL=info

ALLOWED_ORIGINS=http://localhost:3000,https://yourdomain.com

EOF

fi


# Create systemd service file

echo "Creating systemd service..."

sudo tee /etc/systemd/system/air-quality-server.service > /dev/null << EOF

[Unit]

Description=Air Quality Monitoring Server

After=network.target


[Service]

Type=simple

User=ubuntu

WorkingDirectory=/path/to/your/project

ExecStart=/usr/bin/node server.js

Restart=on-failure

RestartSec=10

Environment=NODE_ENV=production


[Install]

WantedBy=multi-user.target

EOF


# Enable and start service

sudo systemctl daemon-reload

sudo systemctl enable air-quality-server

sudo systemctl start air-quality-server


echo "Deployment complete!"

echo "Server status: $(sudo systemctl is-active air-quality-server)"

echo "View logs: sudo journalctl -u air-quality-server -f"


This comprehensive IoT air pollution monitoring system provides a complete solution for distributed environmental monitoring. The ESP32-based sensor nodes continuously collect air quality data and transmit it to a central server that processes the information using artificial intelligence algorithms. The system offers scalable architecture, real-time data analysis, and intelligent insights for environmental monitoring applications. Users can access aggregated data for any geographical area with configurable radius parameters, enabling effective air quality management and public health protection.

No comments: