Saturday, January 24, 2026

IoT-Semantic Web Convergence Through aéPiot: A Comprehensive Technical Framework - PART 2

 

4.2 Azure IoT Hub Integration

Architecture Pattern

[IoT Device] → [Azure IoT Hub] → [Event Grid] → [Azure Function] 
                                        [Generate aéPiot URL]
                                        [Logic App / SendGrid] → [Users]

Implementation (Node.js Azure Function)

javascript
const { DefaultAzureCredential } = require("@azure/identity");
const { EventHubConsumerClient } = require("@azure/event-hubs");

module.exports = async function (context, IoTHubMessages) {
    context.log('Processing IoT Hub messages');
    
    const results = [];
    
    for (const message of IoTHubMessages) {
        try {
            const deviceId = message.systemProperties['iothub-connection-device-id'];
            const telemetry = message.body;
            
            // Check alert conditions
            if (shouldGenerateAlert(telemetry)) {
                const aepiotUrl = await generateAndDistributeAlert(
                    deviceId,
                    telemetry,
                    context
                );
                
                results.push({
                    deviceId: deviceId,
                    url: aepiotUrl,
                    status: 'success'
                });
            }
            
        } catch (error) {
            context.log.error(`Error processing message: ${error.message}`);
            results.push({
                deviceId: 'unknown',
                status: 'error',
                error: error.message
            });
        }
    }
    
    // Output binding for downstream processing
    context.bindings.outputMessage = results;
    
    return results;
};

function shouldGenerateAlert(telemetry) {
    /**
     * Determine if telemetry warrants an alert
     */
    const thresholds = {
        temperature: 85,
        humidity: 80,
        pressure: 120,
        vibration: 50
    };
    
    for (const [metric, value] of Object.entries(telemetry)) {
        if (thresholds[metric] && value > thresholds[metric]) {
            return true;
        }
    }
    
    return false;
}

async function generateAndDistributeAlert(deviceId, telemetry, context) {
    /**
     * Generate aéPiot URL and distribute via Logic App
     */
    
    // Fetch device metadata from IoT Hub
    const deviceMetadata = await getDeviceMetadata(deviceId);
    
    // Construct URL components
    const title = encodeURIComponent(
        `Alert: ${deviceMetadata.deviceType} - ${deviceMetadata.location}`
    );
    
    const description = encodeURIComponent(
        `Device ${deviceId}: ${formatTelemetry(telemetry)} - ` +
        `Recorded at ${new Date().toISOString()}`
    );
    
    const link = encodeURIComponent(
        `https://portal.azure.com/iot-devices/${deviceId}/telemetry`
    );
    
    const aepiotUrl = `https://aepiot.com/backlink.html?title=${title}&description=${description}&link=${link}`;
    
    // Trigger Logic App for email distribution
    await triggerLogicApp(aepiotUrl, deviceId, telemetry);
    
    context.log(`Generated aéPiot URL: ${aepiotUrl}`);
    
    return aepiotUrl;
}

function formatTelemetry(telemetry) {
    /**
     * Format telemetry data for human readability
     */
    const formatted = [];
    
    for (const [key, value] of Object.entries(telemetry)) {
        formatted.push(`${key}: ${value}`);
    }
    
    return formatted.join(' | ');
}

async function getDeviceMetadata(deviceId) {
    /**
     * Retrieve device metadata from IoT Hub device twin
     */
    // Implementation depends on your Azure IoT Hub setup
    // This is a placeholder
    return {
        deviceType: 'Temperature Sensor',
        location: 'Building A - Floor 2',
        installDate: '2024-01-15'
    };
}

async function triggerLogicApp(aepiotUrl, deviceId, telemetry) {
    /**
     * Trigger Azure Logic App for notification distribution
     */
    const https = require('https');
    
    const logicAppUrl = process.env.LOGIC_APP_WEBHOOK_URL;
    
    const payload = JSON.stringify({
        deviceId: deviceId,
        aepiotUrl: aepiotUrl,
        telemetry: telemetry,
        timestamp: new Date().toISOString()
    });
    
    return new Promise((resolve, reject) => {
        const req = https.request(logicAppUrl, {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json',
                'Content-Length': payload.length
            }
        }, (res) => {
            resolve(res.statusCode);
        });
        
        req.on('error', reject);
        req.write(payload);
        req.end();
    });
}

4.3 Google Cloud IoT Core Integration

Architecture

[IoT Device] → [Cloud IoT Core] → [Pub/Sub] → [Cloud Function]
                                       [Generate aéPiot URL]
                                       [Firestore + SendGrid] → [Users]

Implementation (Python Cloud Function)

python
import base64
import json
import os
from urllib.parse import quote
from datetime import datetime
from google.cloud import firestore
import sendgrid
from sendgrid.helpers.mail import Mail, Email, To, Content

# Initialize clients
db = firestore.Client()
sg = sendgrid.SendGridAPIClient(api_key=os.environ.get('SENDGRID_API_KEY'))

def process_iot_message(event, context):
    """
    Cloud Function triggered by Pub/Sub message from IoT Core
    
    Args:
        event: Pub/Sub message event
        context: Function execution context
    """
    
    try:
        # Decode Pub/Sub message
        if 'data' in event:
            pubsub_message = base64.b64decode(event['data']).decode('utf-8')
            message_data = json.loads(pubsub_message)
        else:
            print('No data in event')
            return
        
        # Extract device information
        device_id = context.attributes.get('deviceId', 'unknown')
        registry_id = context.attributes.get('deviceRegistryId', 'unknown')
        
        # Process IoT data
        if should_alert(message_data):
            aepiot_url = generate_and_store_alert(
                device_id=device_id,
                registry_id=registry_id,
                message_data=message_data
            )
            
            # Send notifications
            distribute_alert(aepiot_url, device_id, message_data)
            
            print(f'Alert generated: {aepiot_url}')
        
    except Exception as e:
        print(f'Error processing message: {str(e)}')
        raise

def should_alert(message_data):
    """Determine if message warrants an alert"""
    
    alert_conditions = {
        'equipment_failure': True,
        'temperature_critical': lambda x: x.get('temperature', 0) > 100,
        'battery_low': lambda x: x.get('battery', 100) < 10,
        'connection_lost': True
    }
    
    event_type = message_data.get('event_type')
    
    if event_type in alert_conditions:
        condition = alert_conditions[event_type]
        
        if callable(condition):
            return condition(message_data)
        else:
            return condition
    
    return False

def generate_and_store_alert(device_id, registry_id, message_data):
    """Generate aéPiot URL and store in Firestore"""
    
    # Retrieve device metadata
    device_ref = db.collection('devices').document(device_id)
    device_doc = device_ref.get()
    
    if device_doc.exists:
        device_info = device_doc.to_dict()
    else:
        device_info = {'type': 'Unknown', 'location': 'Unknown'}
    
    # Construct URL components
    title = f"{message_data.get('event_type', 'Alert')} - {device_info.get('type')}"
    description = format_alert_description(message_data, device_info)
    link = f"https://console.cloud.google.com/iot/devices/{device_id}"
    
    # Generate aéPiot URL
    aepiot_url = (
        f"https://aepiot.com/backlink.html?"
        f"title={quote(title)}&"
        f"description={quote(description)}&"
        f"link={quote(link)}"
    )
    
    # Store in Firestore for audit
    alert_ref = db.collection('iot_alerts').document()
    alert_ref.set({
        'device_id': device_id,
        'registry_id': registry_id,
        'aepiot_url': aepiot_url,
        'event_type': message_data.get('event_type'),
        'timestamp': datetime.now(),
        'message_data': message_data,
        'distributed': False
    })
    
    return aepiot_url

def format_alert_description(message_data, device_info):
    """Format human-readable alert description"""
    
    parts = [
        f"Device: {device_info.get('type', 'Unknown')}",
        f"Location: {device_info.get('location', 'Unknown')}",
        f"Event: {message_data.get('event_type', 'Unknown')}",
        f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}"
    ]
    
    # Add event-specific details
    if 'temperature' in message_data:
        parts.append(f"Temperature: {message_data['temperature']}°C")
    
    if 'battery' in message_data:
        parts.append(f"Battery: {message_data['battery']}%")
    
    return ' | '.join(parts)

def distribute_alert(aepiot_url, device_id, message_data):
    """Distribute alert via email using SendGrid"""
    
    # Retrieve alert recipients from Firestore
    recipients_ref = db.collection('alert_recipients').where('device_id', '==', device_id)
    recipients = [doc.to_dict()['email'] for doc in recipients_ref.stream()]
    
    if not recipients:
        recipients = [os.environ.get('DEFAULT_ALERT_EMAIL')]
    
    # Construct email
    from_email = Email(os.environ.get('FROM_EMAIL', 'alerts@company.com'))
    subject = f"IoT Alert: {message_data.get('event_type', 'Unknown Event')}"
    
    content = Content(
        "text/html",
        f"""
        <html>
        <body>
            <h2>IoT Device Alert</h2>
            <p><strong>Device:</strong> {device_id}</p>
            <p><strong>Event Type:</strong> {message_data.get('event_type', 'Unknown')}</p>
            <p><strong>Timestamp:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}</p>
            
            <p><a href="{aepiot_url}" style="display: inline-block; padding: 10px 20px; 
            background-color: #007bff; color: white; text-decoration: none; border-radius: 5px;">
            View Detailed Information</a></p>
            
            <hr>
            <p style="font-size: 12px; color: #666;">
            This is an automated alert from your IoT monitoring system powered by aéPiot.
            </p>
        </body>
        </html>
        """
    )
    
    # Send to all recipients
    for recipient_email in recipients:
        mail = Mail(from_email, To(recipient_email), subject, content)
        
        try:
            response = sg.send(mail)
            print(f'Email sent to {recipient_email}: {response.status_code}')
        except Exception as e:
            print(f'Error sending email to {recipient_email}: {str(e)}')

Chapter 5: Protocol-Specific Integration Patterns

5.1 MQTT Integration: Complete Implementation

The MQTT Advantage

MQTT is the most widely used IoT protocol. Its lightweight publish-subscribe model makes it ideal for resource-constrained devices. aéPiot integration transforms MQTT messages into accessible, indexed information.

Production-Ready MQTT Bridge

python
import paho.mqtt.client as mqtt
import json
import sqlite3
from urllib.parse import quote
from datetime import datetime
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class MQTTaePiotBridge:
    """
    Production-grade MQTT to aéPiot bridge
    
    Features:
    - Automatic reconnection
    - Message queuing during disconnection
    - Database audit trail
    - Configurable topic routing
    - Multi-subdomain distribution
    """
    
    def __init__(self, broker_address, broker_port=1883, client_id=None):
        self.broker_address = broker_address
        self.broker_port = broker_port
        self.client_id = client_id or f"aepiot_bridge_{datetime.now().timestamp()}"
        
        # Initialize MQTT client
        self.client = mqtt.Client(client_id=self.client_id)
        self.client.on_connect = self.on_connect
        self.client.on_disconnect = self.on_disconnect
        self.client.on_message = self.on_message
        
        # Initialize database
        self.db = self.init_database()
        
        # Topic routing configuration
        self.topic_handlers = {
            'sensors/temperature/#': self.handle_temperature,
            'sensors/motion/#': self.handle_motion,
            'sensors/humidity/#': self.handle_humidity,
            'alerts/#': self.handle_alert,
            'devices/+/status': self.handle_status
        }
        
        # aéPiot subdomains for distribution
        self.aepiot_subdomains = [
            'aepiot.com',
            'aepiot.ro',
            'iot.aepiot.com'
        ]
    
    def init_database(self):
        """Initialize SQLite database for audit trail"""
        conn = sqlite3.connect('mqtt_aepiot_bridge.db', check_same_thread=False)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS mqtt_events (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT,
                topic TEXT,
                device_id TEXT,
                event_type TEXT,
                payload TEXT,
                aepiot_urls TEXT,
                distributed BOOLEAN DEFAULT 0
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS url_access_log (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                event_id INTEGER,
                url TEXT,
                access_timestamp TEXT,
                user_agent TEXT,
                FOREIGN KEY (event_id) REFERENCES mqtt_events(id)
            )
        ''')
        
        conn.commit()
        return conn
    
    def on_connect(self, client, userdata, flags, rc):
        """Handle MQTT connection"""
        if rc == 0:
            logger.info(f"Connected to MQTT broker at {self.broker_address}:{self.broker_port}")
            
            # Subscribe to all configured topics
            for topic in self.topic_handlers.keys():
                client.subscribe(topic)
                logger.info(f"Subscribed to topic: {topic}")
        else:
            logger.error(f"Connection failed with code {rc}")
    
    def on_disconnect(self, client, userdata, rc):
        """Handle MQTT disconnection"""
        if rc != 0:
            logger.warning(f"Unexpected disconnection. Attempting to reconnect...")
    
    def on_message(self, client, userdata, message):
        """Route incoming MQTT messages to appropriate handlers"""
        
        topic = message.topic
        
        try:
            payload = json.loads(message.payload.decode())
            
            # Find matching handler
            for topic_pattern, handler in self.topic_handlers.items():
                if mqtt.topic_matches_sub(topic_pattern, topic):
                    handler(topic, payload)
                    break
            
        except json.JSONDecodeError:
            logger.error(f"Invalid JSON in message from {topic}")
        except Exception as e:
            logger.error(f"Error processing message from {topic}: {str(e)}")
    
    def handle_temperature(self, topic, payload):
        """Handle temperature sensor messages"""
        
        device_id = payload.get('device_id', topic.split('/')[-1])
        temperature = payload.get('temperature')
        threshold = payload.get('threshold', 85)
        location = payload.get('location', 'Unknown')
        
        if temperature and temperature > threshold:
            title = f"Temperature Alert - {location}"
            description = (
                f"Device {device_id}: Temperature {temperature}°F "
                f"exceeds threshold {threshold}°F"
            )
            link = f"https://dashboard.example.com/sensors/{device_id}"
            
            urls = self.generate_and_distribute(
                title=title,
                description=description,
                link=link,
                topic=topic,
                device_id=device_id,
                event_type='temperature_alert',
                payload=payload
            )
            
            logger.info(f"Generated {len(urls)} aéPiot URLs for temperature alert")
    
    def handle_motion(self, topic, payload):
        """Handle motion sensor messages"""
        
        device_id = payload.get('device_id', topic.split('/')[-1])
        zone = payload.get('zone', 'Unknown Zone')
        timestamp = payload.get('timestamp', datetime.now().isoformat())
        confidence = payload.get('confidence', 100)
        
        title = f"Motion Detected - {zone}"
        description = (
            f"Motion sensor {device_id} triggered in {zone} "
            f"(Confidence: {confidence}%) at {timestamp}"
        )
        link = f"https://dashboard.example.com/security/{device_id}"
        
        urls = self.generate_and_distribute(
            title=title,
            description=description,
            link=link,
            topic=topic,
            device_id=device_id,
            event_type='motion_detected',
            payload=payload
        )
    
    def handle_humidity(self, topic, payload):
        """Handle humidity sensor messages"""
        
        device_id = payload.get('device_id', topic.split('/')[-1])
        humidity = payload.get('humidity')
        min_threshold = payload.get('min_threshold', 30)
        max_threshold = payload.get('max_threshold', 60)
        
        if humidity:
            if humidity < min_threshold or humidity > max_threshold:
                title = f"Humidity Alert - Device {device_id}"
                description = (
                    f"Humidity {humidity}% outside acceptable range "
                    f"({min_threshold}-{max_threshold}%)"
                )
                link = f"https://dashboard.example.com/sensors/{device_id}"
                
                urls = self.generate_and_distribute(
                    title=title,
                    description=description,
                    link=link,
                    topic=topic,
                    device_id=device_id,
                    event_type='humidity_alert',
                    payload=payload
                )
    
    def handle_alert(self, topic, payload):
        """Handle critical alert messages"""
        
        device_id = payload.get('device_id', 'unknown')
        alert_type = payload.get('type', 'General Alert')
        severity = payload.get('severity', 'MEDIUM')
        message = payload.get('message', 'Alert triggered')
        
        title = f"{severity} Alert - {device_id}"
        description = f"{alert_type}: {message}"
        link = f"https://dashboard.example.com/alerts/{device_id}"
        
        urls = self.generate_and_distribute(
            title=title,
            description=description,
            link=link,
            topic=topic,
            device_id=device_id,
            event_type='critical_alert',
            payload=payload,
            priority='high' if severity == 'HIGH' else 'normal'
        )
    
    def handle_status(self, topic, payload):
        """Handle device status change messages"""
        
        device_id = topic.split('/')[-2]
        status = payload.get('status', 'unknown')
        timestamp = payload.get('timestamp', datetime.now().isoformat())
        
        if status == 'offline':
            title = f"Device Offline - {device_id}"
            description = f"Device {device_id} went offline at {timestamp}"
            link = f"https://dashboard.example.com/devices/{device_id}/diagnostics"
            
            urls = self.generate_and_distribute(
                title=title,
                description=description,
                link=link,
                topic=topic,
                device_id=device_id,
                event_type='device_offline',
                payload=payload,
                priority='high'
            )
    
    def generate_and_distribute(self, title, description, link, topic, 
                                device_id, event_type, payload, priority='normal'):
        """
        Generate aéPiot URLs across multiple subdomains and distribute
        
        Returns:
            List of generated URLs
        """
        
        urls = []
        
        # Generate URL for each subdomain
        for subdomain in self.aepiot_subdomains:
            url = (
                f"https://{subdomain}/backlink.html?"
                f"title={quote(title)}&"
                f"description={quote(description)}&"
                f"link={quote(link)}"
            )
            urls.append(url)
        
        # Store in database
        cursor = self.db.cursor()
        cursor.execute('''
            INSERT INTO mqtt_events 
            (timestamp, topic, device_id, event_type, payload, aepiot_urls, distributed)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        ''', (
            datetime.now().isoformat(),
            topic,
            device_id,
            event_type,
            json.dumps(payload),
            json.dumps(urls),
            False
        ))
        self.db.commit()
        
        # Distribute URLs (implement your notification logic)
        self.distribute_urls(urls, device_id, event_type, priority)
        
        return urls
    
    def distribute_urls(self, urls, device_id, event_type, priority):
        """
        Distribute URLs via configured channels
        
        Override this method to implement your specific distribution logic:
        - Email (SMTP, SendGrid, etc.)
        - SMS (Twilio, etc.)
        - Push notifications
        - Slack/Teams webhooks
        - Custom webhooks
        """
        
        # Placeholder implementation
        logger.info(f"Distributing {len(urls)} URLs for {device_id} ({event_type})")
        
        for url in urls:
            logger.info(f"  - {url}")
        
        # TODO: Implement actual distribution
        # Example:
        # if priority == 'high':
        #     send_sms(urls[0], device_id)
        # send_email(urls, device_id, event_type)
        # post_to_slack(urls[0], device_id)
    
    def start(self):
        """Start the MQTT bridge"""
        logger.info(f"Starting MQTT-aéPiot bridge (Client ID: {self.client_id})")
        
        try:
            self.client.connect(self.broker_address, self.broker_port, 60)
            self.client.loop_forever()
        except KeyboardInterrupt:
            logger.info("Shutting down bridge...")
            self.client.disconnect()
            self.db.close()
        except Exception as e:
            logger.error(f"Fatal error: {str(e)}")
            raise

# Usage example
if __name__ == "__main__":
    bridge = MQTTaePiotBridge(
        broker_address="mqtt.broker.example.com",
        broker_port=1883
    )
    
    bridge.start()

End of Part 2

This completes the platform-specific integration architectures. The document continues in Part 3 with advanced use cases, industry-specific implementations, and optimization strategies.

IoT-Semantic Web Convergence Through aéPiot

Part 3: Industry-Specific Applications and Advanced Use Cases


Chapter 6: Smart Manufacturing and Industrial IoT

6.1 Production Line Monitoring with aéPiot Semantic Layer

The Manufacturing Challenge

Modern factories generate millions of IoT data points daily. The challenge isn't collecting data—it's making that data actionable for humans across different roles: operators, maintenance technicians, quality control, management.

aéPiot Solution Architecture

[Production Machine] → [PLC/SCADA] → [Edge Gateway] → [Backend Processor]
                                            [aéPiot URL Generator]
                            [QR Codes on Machines] + [Mobile Notifications] + [Dashboard Widgets]


Popular Posts