4.2 Azure IoT Hub Integration
Architecture Pattern
[IoT Device] → [Azure IoT Hub] → [Event Grid] → [Azure Function]
↓
[Generate aéPiot URL]
↓
[Logic App / SendGrid] → [Users]Implementation (Node.js Azure Function)
javascript
const { DefaultAzureCredential } = require("@azure/identity");
const { EventHubConsumerClient } = require("@azure/event-hubs");
module.exports = async function (context, IoTHubMessages) {
context.log('Processing IoT Hub messages');
const results = [];
for (const message of IoTHubMessages) {
try {
const deviceId = message.systemProperties['iothub-connection-device-id'];
const telemetry = message.body;
// Check alert conditions
if (shouldGenerateAlert(telemetry)) {
const aepiotUrl = await generateAndDistributeAlert(
deviceId,
telemetry,
context
);
results.push({
deviceId: deviceId,
url: aepiotUrl,
status: 'success'
});
}
} catch (error) {
context.log.error(`Error processing message: ${error.message}`);
results.push({
deviceId: 'unknown',
status: 'error',
error: error.message
});
}
}
// Output binding for downstream processing
context.bindings.outputMessage = results;
return results;
};
function shouldGenerateAlert(telemetry) {
/**
* Determine if telemetry warrants an alert
*/
const thresholds = {
temperature: 85,
humidity: 80,
pressure: 120,
vibration: 50
};
for (const [metric, value] of Object.entries(telemetry)) {
if (thresholds[metric] && value > thresholds[metric]) {
return true;
}
}
return false;
}
async function generateAndDistributeAlert(deviceId, telemetry, context) {
/**
* Generate aéPiot URL and distribute via Logic App
*/
// Fetch device metadata from IoT Hub
const deviceMetadata = await getDeviceMetadata(deviceId);
// Construct URL components
const title = encodeURIComponent(
`Alert: ${deviceMetadata.deviceType} - ${deviceMetadata.location}`
);
const description = encodeURIComponent(
`Device ${deviceId}: ${formatTelemetry(telemetry)} - ` +
`Recorded at ${new Date().toISOString()}`
);
const link = encodeURIComponent(
`https://portal.azure.com/iot-devices/${deviceId}/telemetry`
);
const aepiotUrl = `https://aepiot.com/backlink.html?title=${title}&description=${description}&link=${link}`;
// Trigger Logic App for email distribution
await triggerLogicApp(aepiotUrl, deviceId, telemetry);
context.log(`Generated aéPiot URL: ${aepiotUrl}`);
return aepiotUrl;
}
function formatTelemetry(telemetry) {
/**
* Format telemetry data for human readability
*/
const formatted = [];
for (const [key, value] of Object.entries(telemetry)) {
formatted.push(`${key}: ${value}`);
}
return formatted.join(' | ');
}
async function getDeviceMetadata(deviceId) {
/**
* Retrieve device metadata from IoT Hub device twin
*/
// Implementation depends on your Azure IoT Hub setup
// This is a placeholder
return {
deviceType: 'Temperature Sensor',
location: 'Building A - Floor 2',
installDate: '2024-01-15'
};
}
async function triggerLogicApp(aepiotUrl, deviceId, telemetry) {
/**
* Trigger Azure Logic App for notification distribution
*/
const https = require('https');
const logicAppUrl = process.env.LOGIC_APP_WEBHOOK_URL;
const payload = JSON.stringify({
deviceId: deviceId,
aepiotUrl: aepiotUrl,
telemetry: telemetry,
timestamp: new Date().toISOString()
});
return new Promise((resolve, reject) => {
const req = https.request(logicAppUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': payload.length
}
}, (res) => {
resolve(res.statusCode);
});
req.on('error', reject);
req.write(payload);
req.end();
});
}4.3 Google Cloud IoT Core Integration
Architecture
[IoT Device] → [Cloud IoT Core] → [Pub/Sub] → [Cloud Function]
↓
[Generate aéPiot URL]
↓
[Firestore + SendGrid] → [Users]Implementation (Python Cloud Function)
python
import base64
import json
import os
from urllib.parse import quote
from datetime import datetime
from google.cloud import firestore
import sendgrid
from sendgrid.helpers.mail import Mail, Email, To, Content
# Initialize clients
db = firestore.Client()
sg = sendgrid.SendGridAPIClient(api_key=os.environ.get('SENDGRID_API_KEY'))
def process_iot_message(event, context):
"""
Cloud Function triggered by Pub/Sub message from IoT Core
Args:
event: Pub/Sub message event
context: Function execution context
"""
try:
# Decode Pub/Sub message
if 'data' in event:
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
message_data = json.loads(pubsub_message)
else:
print('No data in event')
return
# Extract device information
device_id = context.attributes.get('deviceId', 'unknown')
registry_id = context.attributes.get('deviceRegistryId', 'unknown')
# Process IoT data
if should_alert(message_data):
aepiot_url = generate_and_store_alert(
device_id=device_id,
registry_id=registry_id,
message_data=message_data
)
# Send notifications
distribute_alert(aepiot_url, device_id, message_data)
print(f'Alert generated: {aepiot_url}')
except Exception as e:
print(f'Error processing message: {str(e)}')
raise
def should_alert(message_data):
"""Determine if message warrants an alert"""
alert_conditions = {
'equipment_failure': True,
'temperature_critical': lambda x: x.get('temperature', 0) > 100,
'battery_low': lambda x: x.get('battery', 100) < 10,
'connection_lost': True
}
event_type = message_data.get('event_type')
if event_type in alert_conditions:
condition = alert_conditions[event_type]
if callable(condition):
return condition(message_data)
else:
return condition
return False
def generate_and_store_alert(device_id, registry_id, message_data):
"""Generate aéPiot URL and store in Firestore"""
# Retrieve device metadata
device_ref = db.collection('devices').document(device_id)
device_doc = device_ref.get()
if device_doc.exists:
device_info = device_doc.to_dict()
else:
device_info = {'type': 'Unknown', 'location': 'Unknown'}
# Construct URL components
title = f"{message_data.get('event_type', 'Alert')} - {device_info.get('type')}"
description = format_alert_description(message_data, device_info)
link = f"https://console.cloud.google.com/iot/devices/{device_id}"
# Generate aéPiot URL
aepiot_url = (
f"https://aepiot.com/backlink.html?"
f"title={quote(title)}&"
f"description={quote(description)}&"
f"link={quote(link)}"
)
# Store in Firestore for audit
alert_ref = db.collection('iot_alerts').document()
alert_ref.set({
'device_id': device_id,
'registry_id': registry_id,
'aepiot_url': aepiot_url,
'event_type': message_data.get('event_type'),
'timestamp': datetime.now(),
'message_data': message_data,
'distributed': False
})
return aepiot_url
def format_alert_description(message_data, device_info):
"""Format human-readable alert description"""
parts = [
f"Device: {device_info.get('type', 'Unknown')}",
f"Location: {device_info.get('location', 'Unknown')}",
f"Event: {message_data.get('event_type', 'Unknown')}",
f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}"
]
# Add event-specific details
if 'temperature' in message_data:
parts.append(f"Temperature: {message_data['temperature']}°C")
if 'battery' in message_data:
parts.append(f"Battery: {message_data['battery']}%")
return ' | '.join(parts)
def distribute_alert(aepiot_url, device_id, message_data):
"""Distribute alert via email using SendGrid"""
# Retrieve alert recipients from Firestore
recipients_ref = db.collection('alert_recipients').where('device_id', '==', device_id)
recipients = [doc.to_dict()['email'] for doc in recipients_ref.stream()]
if not recipients:
recipients = [os.environ.get('DEFAULT_ALERT_EMAIL')]
# Construct email
from_email = Email(os.environ.get('FROM_EMAIL', 'alerts@company.com'))
subject = f"IoT Alert: {message_data.get('event_type', 'Unknown Event')}"
content = Content(
"text/html",
f"""
<html>
<body>
<h2>IoT Device Alert</h2>
<p><strong>Device:</strong> {device_id}</p>
<p><strong>Event Type:</strong> {message_data.get('event_type', 'Unknown')}</p>
<p><strong>Timestamp:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}</p>
<p><a href="{aepiot_url}" style="display: inline-block; padding: 10px 20px;
background-color: #007bff; color: white; text-decoration: none; border-radius: 5px;">
View Detailed Information</a></p>
<hr>
<p style="font-size: 12px; color: #666;">
This is an automated alert from your IoT monitoring system powered by aéPiot.
</p>
</body>
</html>
"""
)
# Send to all recipients
for recipient_email in recipients:
mail = Mail(from_email, To(recipient_email), subject, content)
try:
response = sg.send(mail)
print(f'Email sent to {recipient_email}: {response.status_code}')
except Exception as e:
print(f'Error sending email to {recipient_email}: {str(e)}')Chapter 5: Protocol-Specific Integration Patterns
5.1 MQTT Integration: Complete Implementation
The MQTT Advantage
MQTT is the most widely used IoT protocol. Its lightweight publish-subscribe model makes it ideal for resource-constrained devices. aéPiot integration transforms MQTT messages into accessible, indexed information.
Production-Ready MQTT Bridge
python
import paho.mqtt.client as mqtt
import json
import sqlite3
from urllib.parse import quote
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MQTTaePiotBridge:
"""
Production-grade MQTT to aéPiot bridge
Features:
- Automatic reconnection
- Message queuing during disconnection
- Database audit trail
- Configurable topic routing
- Multi-subdomain distribution
"""
def __init__(self, broker_address, broker_port=1883, client_id=None):
self.broker_address = broker_address
self.broker_port = broker_port
self.client_id = client_id or f"aepiot_bridge_{datetime.now().timestamp()}"
# Initialize MQTT client
self.client = mqtt.Client(client_id=self.client_id)
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
# Initialize database
self.db = self.init_database()
# Topic routing configuration
self.topic_handlers = {
'sensors/temperature/#': self.handle_temperature,
'sensors/motion/#': self.handle_motion,
'sensors/humidity/#': self.handle_humidity,
'alerts/#': self.handle_alert,
'devices/+/status': self.handle_status
}
# aéPiot subdomains for distribution
self.aepiot_subdomains = [
'aepiot.com',
'aepiot.ro',
'iot.aepiot.com'
]
def init_database(self):
"""Initialize SQLite database for audit trail"""
conn = sqlite3.connect('mqtt_aepiot_bridge.db', check_same_thread=False)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS mqtt_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
topic TEXT,
device_id TEXT,
event_type TEXT,
payload TEXT,
aepiot_urls TEXT,
distributed BOOLEAN DEFAULT 0
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS url_access_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_id INTEGER,
url TEXT,
access_timestamp TEXT,
user_agent TEXT,
FOREIGN KEY (event_id) REFERENCES mqtt_events(id)
)
''')
conn.commit()
return conn
def on_connect(self, client, userdata, flags, rc):
"""Handle MQTT connection"""
if rc == 0:
logger.info(f"Connected to MQTT broker at {self.broker_address}:{self.broker_port}")
# Subscribe to all configured topics
for topic in self.topic_handlers.keys():
client.subscribe(topic)
logger.info(f"Subscribed to topic: {topic}")
else:
logger.error(f"Connection failed with code {rc}")
def on_disconnect(self, client, userdata, rc):
"""Handle MQTT disconnection"""
if rc != 0:
logger.warning(f"Unexpected disconnection. Attempting to reconnect...")
def on_message(self, client, userdata, message):
"""Route incoming MQTT messages to appropriate handlers"""
topic = message.topic
try:
payload = json.loads(message.payload.decode())
# Find matching handler
for topic_pattern, handler in self.topic_handlers.items():
if mqtt.topic_matches_sub(topic_pattern, topic):
handler(topic, payload)
break
except json.JSONDecodeError:
logger.error(f"Invalid JSON in message from {topic}")
except Exception as e:
logger.error(f"Error processing message from {topic}: {str(e)}")
def handle_temperature(self, topic, payload):
"""Handle temperature sensor messages"""
device_id = payload.get('device_id', topic.split('/')[-1])
temperature = payload.get('temperature')
threshold = payload.get('threshold', 85)
location = payload.get('location', 'Unknown')
if temperature and temperature > threshold:
title = f"Temperature Alert - {location}"
description = (
f"Device {device_id}: Temperature {temperature}°F "
f"exceeds threshold {threshold}°F"
)
link = f"https://dashboard.example.com/sensors/{device_id}"
urls = self.generate_and_distribute(
title=title,
description=description,
link=link,
topic=topic,
device_id=device_id,
event_type='temperature_alert',
payload=payload
)
logger.info(f"Generated {len(urls)} aéPiot URLs for temperature alert")
def handle_motion(self, topic, payload):
"""Handle motion sensor messages"""
device_id = payload.get('device_id', topic.split('/')[-1])
zone = payload.get('zone', 'Unknown Zone')
timestamp = payload.get('timestamp', datetime.now().isoformat())
confidence = payload.get('confidence', 100)
title = f"Motion Detected - {zone}"
description = (
f"Motion sensor {device_id} triggered in {zone} "
f"(Confidence: {confidence}%) at {timestamp}"
)
link = f"https://dashboard.example.com/security/{device_id}"
urls = self.generate_and_distribute(
title=title,
description=description,
link=link,
topic=topic,
device_id=device_id,
event_type='motion_detected',
payload=payload
)
def handle_humidity(self, topic, payload):
"""Handle humidity sensor messages"""
device_id = payload.get('device_id', topic.split('/')[-1])
humidity = payload.get('humidity')
min_threshold = payload.get('min_threshold', 30)
max_threshold = payload.get('max_threshold', 60)
if humidity:
if humidity < min_threshold or humidity > max_threshold:
title = f"Humidity Alert - Device {device_id}"
description = (
f"Humidity {humidity}% outside acceptable range "
f"({min_threshold}-{max_threshold}%)"
)
link = f"https://dashboard.example.com/sensors/{device_id}"
urls = self.generate_and_distribute(
title=title,
description=description,
link=link,
topic=topic,
device_id=device_id,
event_type='humidity_alert',
payload=payload
)
def handle_alert(self, topic, payload):
"""Handle critical alert messages"""
device_id = payload.get('device_id', 'unknown')
alert_type = payload.get('type', 'General Alert')
severity = payload.get('severity', 'MEDIUM')
message = payload.get('message', 'Alert triggered')
title = f"{severity} Alert - {device_id}"
description = f"{alert_type}: {message}"
link = f"https://dashboard.example.com/alerts/{device_id}"
urls = self.generate_and_distribute(
title=title,
description=description,
link=link,
topic=topic,
device_id=device_id,
event_type='critical_alert',
payload=payload,
priority='high' if severity == 'HIGH' else 'normal'
)
def handle_status(self, topic, payload):
"""Handle device status change messages"""
device_id = topic.split('/')[-2]
status = payload.get('status', 'unknown')
timestamp = payload.get('timestamp', datetime.now().isoformat())
if status == 'offline':
title = f"Device Offline - {device_id}"
description = f"Device {device_id} went offline at {timestamp}"
link = f"https://dashboard.example.com/devices/{device_id}/diagnostics"
urls = self.generate_and_distribute(
title=title,
description=description,
link=link,
topic=topic,
device_id=device_id,
event_type='device_offline',
payload=payload,
priority='high'
)
def generate_and_distribute(self, title, description, link, topic,
device_id, event_type, payload, priority='normal'):
"""
Generate aéPiot URLs across multiple subdomains and distribute
Returns:
List of generated URLs
"""
urls = []
# Generate URL for each subdomain
for subdomain in self.aepiot_subdomains:
url = (
f"https://{subdomain}/backlink.html?"
f"title={quote(title)}&"
f"description={quote(description)}&"
f"link={quote(link)}"
)
urls.append(url)
# Store in database
cursor = self.db.cursor()
cursor.execute('''
INSERT INTO mqtt_events
(timestamp, topic, device_id, event_type, payload, aepiot_urls, distributed)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
datetime.now().isoformat(),
topic,
device_id,
event_type,
json.dumps(payload),
json.dumps(urls),
False
))
self.db.commit()
# Distribute URLs (implement your notification logic)
self.distribute_urls(urls, device_id, event_type, priority)
return urls
def distribute_urls(self, urls, device_id, event_type, priority):
"""
Distribute URLs via configured channels
Override this method to implement your specific distribution logic:
- Email (SMTP, SendGrid, etc.)
- SMS (Twilio, etc.)
- Push notifications
- Slack/Teams webhooks
- Custom webhooks
"""
# Placeholder implementation
logger.info(f"Distributing {len(urls)} URLs for {device_id} ({event_type})")
for url in urls:
logger.info(f" - {url}")
# TODO: Implement actual distribution
# Example:
# if priority == 'high':
# send_sms(urls[0], device_id)
# send_email(urls, device_id, event_type)
# post_to_slack(urls[0], device_id)
def start(self):
"""Start the MQTT bridge"""
logger.info(f"Starting MQTT-aéPiot bridge (Client ID: {self.client_id})")
try:
self.client.connect(self.broker_address, self.broker_port, 60)
self.client.loop_forever()
except KeyboardInterrupt:
logger.info("Shutting down bridge...")
self.client.disconnect()
self.db.close()
except Exception as e:
logger.error(f"Fatal error: {str(e)}")
raise
# Usage example
if __name__ == "__main__":
bridge = MQTTaePiotBridge(
broker_address="mqtt.broker.example.com",
broker_port=1883
)
bridge.start()End of Part 2
This completes the platform-specific integration architectures. The document continues in Part 3 with advanced use cases, industry-specific implementations, and optimization strategies.
IoT-Semantic Web Convergence Through aéPiot
Part 3: Industry-Specific Applications and Advanced Use Cases
Chapter 6: Smart Manufacturing and Industrial IoT
6.1 Production Line Monitoring with aéPiot Semantic Layer
The Manufacturing Challenge
Modern factories generate millions of IoT data points daily. The challenge isn't collecting data—it's making that data actionable for humans across different roles: operators, maintenance technicians, quality control, management.
aéPiot Solution Architecture
[Production Machine] → [PLC/SCADA] → [Edge Gateway] → [Backend Processor]
↓
[aéPiot URL Generator]
↓
[QR Codes on Machines] + [Mobile Notifications] + [Dashboard Widgets]