IoT Platform - System Design cho Senior Engineers
Giα»i thiα»u
NαΊΏu bαΊ‘n Δang thiαΊΏt kαΊΏ mα»t IoT platform, bαΊ‘n Δang Δα»i mαΊ·t vα»i mα»t trong nhα»―ng bΓ i toΓ‘n distributed systems khΓ³ nhαΊ₯t: quαΊ£n lΓ½ hΓ ng triα»u (hoαΊ·c tα»·) thiαΊΏt bα» vα»i hardware hαΊ‘n chαΊΏ, kαΊΏt nα»i khΓ΄ng α»n Δα»nh, vΓ yΓͺu cαΊ§u xα» lΓ½ real-time vα»i Δα» trα» thαΊ₯p.
ThΓ‘ch thα»©c ΔαΊ·c trΖ°ng cα»§a IoT
Traditional Web API vs IoT Platform
βββββββββββββββββ βββββββββββββ
β’ 1M users/day β’ 100M devices always-on
β’ Rich clients (browser/mobile) β’ Constrained devices (256KB RAM)
β’ Stable connectivity β’ Intermittent network (2G/3G)
β’ Request-response model β’ Push + bidirectional
β’ Stateless architecture β’ Stateful (device shadows)
β’ Human-generated data β’ High-frequency telemetry
Key challenges:
- Scale: Billions of devices, millions of concurrent connections
- Hardware constraints: 256KB RAM, 32KB flash, 8-bit CPU
- Connectivity: Intermittent, high latency (satellite), low bandwidth
- Power: Battery-powered devices (years of lifetime)
- Security: Physical access to devices, firmware vulnerabilities
- Heterogeneity: Protocols (MQTT, CoAP, HTTP), data formats, vendors
IoT Platform Architecture (High-Level)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β IoT Devices Layer β
β [Sensors] [Actuators] [Edge Gateways] [Industrial Equipment] β
ββββββββββββββ¬βββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββ
β MQTT/CoAP/HTTP β TLS/DTLS
βΌ βΌ
ββββββββββββββββββββββββββ ββββββββββββββββββββββββ
β Protocol Gateway β β Device Gateway β
β (MQTT Broker Cluster) β β (Load Balancer) β
ββββββββββ¬ββββββββββββββββ ββββββββββββ¬ββββββββββββ
β β
βΌ βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Message Router / Event Hub β
β (Kafka / Kinesis / Azure Event Hub) β
βββββββ¬βββββββββββββββ¬βββββββββββββββ¬ββββββββββββββββββ
β β β
βΌ βΌ βΌ
ββββββββββββ βββββββββββββββ βββββββββββββββ
β Telemetryβ β Command β β Device β
β Ingestionβ β Service β β Registry β
ββββββ¬ββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ
β β β
βΌ βΌ βΌ
ββββββββββββββ ββββββββββββ βββββββββββββββ
β Time-Seriesβ β Device β β Postgres β
β DB (Influx)β β Shadows β β (metadata) β
ββββββββββββββ ββββββββββββ βββββββββββββββ
Device Management
Device Provisioning at Scale
Challenge: LΓ m sao provision 1 triα»u devices mΓ khΓ΄ng phαΊ£i manually configure tα»«ng cΓ‘i?
Strategies:
- Zero-Touch Provisioning (ZTP)
Device Boot β DHCP β Download config from server β Authenticate β Activate
- Certificate-based Auto-enrollment (nhΖ° AWS IoT Just-in-Time Registration)
# Device-side: Generate CSR and request certificate
import ssl
import paho.mqtt.client as mqtt
def provision_device():
# Step 1: Generate key pair (done once in factory)
device_id = get_hardware_id() # MAC address, serial number
# Step 2: Connect with claim certificate (embedded in firmware)
client = mqtt.Client(client_id=device_id)
client.tls_set(
ca_certs="root-ca.pem",
certfile="claim-cert.pem", # Factory-issued
keyfile="claim-key.pem"
)
# Step 3: Publish provisioning request
client.connect("provisioning.iot.example.com", 8883)
client.publish(
f"$aws/certificates/create-from-csr/json",
payload=json.dumps({
"csr": generate_csr(device_id),
"template": "production-template"
})
)
# Step 4: Receive unique certificate
# Server validates claim cert β issues device cert β stores in registry
# Platform-side: Auto-registration Lambda
def on_certificate_created(event):
cert_id = event['certificateId']
device_id = extract_cn(event['csr'])
# Validate against whitelist (factory manifest)
if not is_authorized(device_id):
revoke_certificate(cert_id)
return
# Create device in registry
create_device(device_id, cert_id)
# Attach policy (publish/subscribe permissions)
attach_policy(cert_id, "device-policy")
# Initialize device shadow
create_shadow(device_id, {"state": {"desired": {}}})
Trade-offs:
- Claim cert: Simpler, nhưng nếu leak thì attacker có thỠprovision rogue devices
- Pre-registration: Secure hΖ‘n (whitelist serial numbers), nhΖ°ng cαΊ§n manual upload manifest
- Public Key Infrastructure (PKI): Scalable nhαΊ₯t, nhΖ°ng complex (CA hierarchy, CRL/OCSP)
Firmware Over-the-Air (OTA) Updates
Scenario: BαΊ‘n cαΊ§n patch security vulnerability trΓͺn 10 triα»u devices Δang chαΊ‘y α» production.
Architecture:
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β Build CI/CD ββββββββΆβ FW Registry ββββββββΆβ CDN/S3 β
β (signed fw) β β (metadata DB)β β (binary blob)β
ββββββββββββββββ ββββββββ¬ββββββββ ββββββββββββββββ
β
βΌ
ββββββββββββββββ
β OTA Schedulerβ
β (canary/ β
β rollout) β
ββββββββ¬ββββββββ
β
βββββββββββββββΌββββββββββββββ
βΌ βΌ βΌ
[Device A] [Device B] [Device C]
(health check) (download) (apply + verify)
Production-grade OTA Service (Go):
package ota
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
)
type FirmwareMetadata struct {
Version string
MinVersion string // Devices below this can't upgrade
URL string // CDN URL
Size int64
SHA256 string
Signature string // RSA signature from build server
RolloutPhase string // "canary", "staged", "general"
}
type OTAService struct {
registry DeviceRegistry
storage BlobStorage
metrics MetricsCollector
}
// Device polls for updates (pull model)
func (s *OTAService) CheckUpdate(ctx context.Context, deviceID string) (*FirmwareMetadata, error) {
device, err := s.registry.GetDevice(ctx, deviceID)
if err != nil {
return nil, err
}
// Get latest firmware for device type
latest, err := s.registry.GetLatestFirmware(device.Type, device.HardwareVersion)
if err != nil {
return nil, err
}
// Skip if device already up-to-date
if device.FirmwareVersion >= latest.Version {
return nil, nil
}
// Canary strategy: only 1% of devices get latest during canary
if latest.RolloutPhase == "canary" && !s.isInCanaryGroup(deviceID, 0.01) {
return nil, nil // Skip this device for now
}
// Staged rollout: gradual percentage increase
if latest.RolloutPhase == "staged" {
percentage := s.getStagedPercentage(latest.CreatedAt)
if !s.isInCanaryGroup(deviceID, percentage) {
return nil, nil
}
}
return latest, nil
}
// Report update status (for monitoring)
func (s *OTAService) ReportUpdateStatus(ctx context.Context, deviceID, version, status string) error {
s.metrics.Increment("ota.status", map[string]string{
"version": version,
"status": status, // downloading, verifying, applying, success, failed
})
// If failure rate > 5%, auto-rollback
failureRate := s.metrics.GetRate("ota.status", map[string]string{
"version": version,
"status": "failed",
})
if failureRate > 0.05 {
s.registry.PauseFirmwareRollout(version)
s.alertOncall("High OTA failure rate for version %s", version)
}
return s.registry.UpdateDeviceFirmwareStatus(ctx, deviceID, version, status)
}
// Generate presigned URL for download (avoid hotlinking)
func (s *OTAService) GetDownloadURL(ctx context.Context, deviceID, version string) (string, error) {
// Verify device is authorized to download this version
fw, err := s.CheckUpdate(ctx, deviceID)
if err != nil || fw == nil || fw.Version != version {
return "", fmt.Errorf("unauthorized")
}
// Generate presigned URL (valid for 1 hour)
return s.storage.PresignedURL(fw.URL, 3600)
}
Device-side implementation (C for constrained devices):
#include <stdio.h>
#include <mbedtls/sha256.h>
#include <esp_https_ota.h>
// Atomic update with rollback
typedef enum {
PARTITION_A,
PARTITION_B
} partition_t;
partition_t current_partition;
partition_t next_partition;
int perform_ota_update(const char* url, const char* expected_sha256) {
esp_http_client_config_t config = {
.url = url,
.cert_pem = server_cert_pem_start,
.timeout_ms = 10000,
};
esp_https_ota_config_t ota_config = {
.http_config = &config,
};
esp_https_ota_handle_t ota_handle = NULL;
// Step 1: Begin OTA (write to inactive partition)
esp_err_t err = esp_https_ota_begin(&ota_config, &ota_handle);
if (err != ESP_OK) {
report_status("failed", "ota_begin_failed");
return -1;
}
// Step 2: Download and verify in chunks
report_status("downloading", NULL);
while (1) {
err = esp_https_ota_perform(ota_handle);
if (err != ESP_ERR_HTTPS_OTA_IN_PROGRESS) {
break;
}
// Print progress (optional for debugging via serial)
}
if (err != ESP_OK) {
esp_https_ota_abort(ota_handle);
report_status("failed", "download_failed");
return -1;
}
// Step 3: Verify signature and hash
esp_app_desc_t new_app_info;
esp_https_ota_get_img_desc(ota_handle, &new_app_info);
// Compare SHA256
if (verify_sha256(ota_handle, expected_sha256) != 0) {
esp_https_ota_abort(ota_handle);
report_status("failed", "checksum_mismatch");
return -1;
}
// Step 4: Finalize (mark partition as bootable)
err = esp_https_ota_finish(ota_handle);
if (err != ESP_OK) {
report_status("failed", "ota_finish_failed");
return -1;
}
report_status("success", NULL);
// Step 5: Reboot to new partition
// Bootloader will try new partition, fallback to old if boot fails
esp_restart();
return 0;
}
// Bootloader marks partition as valid after successful boot
void app_main() {
// Verify this partition boots correctly
const esp_partition_t *running = esp_ota_get_running_partition();
esp_ota_img_states_t ota_state;
esp_ota_get_state_partition(running, &ota_state);
if (ota_state == ESP_OTA_IMG_PENDING_VERIFY) {
// First boot after OTA, mark as valid
esp_ota_mark_app_valid_cancel_rollback();
}
// Continue app initialization...
}
Key decisions:
- Pull vs Push: Pull (device polls) scales better than push (server tracks all devices)
- Delta updates: Reduce bandwidth (binary diff), but complex implementation
- Dual-bank: Atomic updates with rollback, requires 2x flash size
- Canary rollout: Critical for catching bugs early (1% β 10% β 100%)
MQTT Protocol Deep Dive
MQTT lΓ giao thα»©c phα» biαΊΏn nhαΊ₯t cho IoT vΓ¬ lightweight (2-byte header tα»i thiα»u) vΓ publish/subscribe model (decouple producers/consumers).
MQTT Architecture
βββββββββββββββββββββββ
β MQTT Broker β
β (VerneMQ/EMQ X) β
ββββββββββββ¬βββββββββββ
β
ββββββββββββββββββββββΌβββββββββββββββββββββ
β β β
[Publisher A] [Publisher B] [Subscriber C]
topic: sensors/ topic: sensors/ subscribe:
temp/room1 humidity/room1 sensors/+/room1
β β β
ββββββββββββββββββββββ΄βββββββββββββββββββββ
Shared bus
QoS Levels
QoS 0 (At most once) QoS 1 (At least once) QoS 2 (Exactly once)
ββββββββββββββββββββ βββββββββββββββββββββ ββββββββββββββββββββ
Publisher ββPUBLISHβββΆ Broker Publisher ββPUBLISHβββΆ Broker Publisher ββPUBLISHβββΆ Broker
(no ack) βββPUBACKβββ (packet ID) βββPUBRECβββ
ββPUBRELβββΆ
Fire and forget May duplicate βββPUBCOMPββ
No duplicates
When to use:
- QoS 0: Telemetry (temperature every 10s) - mα»t sample bα» mαΊ₯t khΓ΄ng critical
- QoS 1: Commands (turn on light) - duplicate OK (idempotent)
- QoS 2: Billing events (charge customer) - no duplicates allowed
Performance impact:
import paho.mqtt.client as mqtt
import time
def benchmark_qos():
client = mqtt.Client()
client.connect("localhost", 1883)
payload = b"x" * 100 # 100 bytes
count = 10000
# QoS 0
start = time.time()
for i in range(count):
client.publish("benchmark/qos0", payload, qos=0)
qos0_time = time.time() - start
print(f"QoS 0: {count/qos0_time:.0f} msg/sec")
# QoS 1
start = time.time()
for i in range(count):
info = client.publish("benchmark/qos1", payload, qos=1)
info.wait_for_publish() # Wait for PUBACK
qos1_time = time.time() - start
print(f"QoS 1: {count/qos1_time:.0f} msg/sec")
# QoS 2
start = time.time()
for i in range(count):
info = client.publish("benchmark/qos2", payload, qos=2)
info.wait_for_publish() # Wait for PUBCOMP
qos2_time = time.time() - start
print(f"QoS 2: {count/qos2_time:.0f} msg/sec")
# Results (typical):
# QoS 0: ~100,000 msg/sec
# QoS 1: ~30,000 msg/sec
# QoS 2: ~10,000 msg/sec
Retained Messages & Last Will
Retained messages: Broker stores last value, new subscribers get it immediately.
# Publisher: Report device status
client.publish("devices/sensor123/status",
payload='{"online": true, "battery": 85}',
retain=True, # Store this value
qos=1)
# New subscriber connects 10 minutes later
client.subscribe("devices/+/status")
# Immediately receives retained status for all devices
Last Will Testament (LWT): Auto-publish message khi client disconnect bαΊ₯t ngα».
client = mqtt.Client()
# Set LWT before connect
client.will_set(
topic="devices/sensor123/status",
payload='{"online": false}',
qos=1,
retain=True
)
client.connect("broker.example.com")
# If network drops or device crashes, broker auto-publishes LWT
Use case: Device heartbeat monitoring
type DeviceMonitor struct {
redis *redis.Client
}
// MQTT subscriber for LWT messages
func (m *DeviceMonitor) OnMessage(topic string, payload []byte) {
deviceID := extractDeviceID(topic) // devices/sensor123/status
var status struct {
Online bool `json:"online"`
Battery int `json:"battery"`
}
json.Unmarshal(payload, &status)
if !status.Online {
// Device went offline
m.redis.Set(ctx, fmt.Sprintf("device:%s:online", deviceID), "0", 0)
// Trigger alert if offline > 5 minutes
m.scheduleOfflineAlert(deviceID, 5*time.Minute)
} else {
// Device back online
m.redis.Set(ctx, fmt.Sprintf("device:%s:online", deviceID), "1", 0)
m.redis.Set(ctx, fmt.Sprintf("device:%s:last_seen", deviceID), time.Now().Unix(), 0)
}
}
MQTT Broker Clustering
Challenge: Scale MQTT broker to handle 1M concurrent connections.
VerneMQ clustering:
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β VerneMQ 1 ββββββΆβ VerneMQ 2 ββββββΆβ VerneMQ 3 β
β (100k conn) β β (100k conn) β β (100k conn) β
ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ
β β β
βββββββββββββββββββββ΄ββββββββββββββββββββ
Plumtree gossip (CRDT)
Config (vernemq.conf):
# Clustering
nodename = VerneMQ1@192.168.1.101
distributed_cookie = secret_cluster_cookie
# Join cluster
cluster.join = VerneMQ2@192.168.1.102
# Performance tuning
max_connections = 100000
max_inflight_messages = 100
max_message_size = 65536
# Persistent session in database (distributed)
metadata_plugin = vmq_swc
vmq_swc.db_backend = leveldb
Subscription routing: Khi device subscribe α» node A, nhΖ°ng message publish α» node B:
Device A ββsubscribe("sensors/#")βββΆ Node A
Device B ββpublish("sensors/temp")ββΆ Node B
β
Node A βββββgossipβββββββ
β
βββββdeliverβββΆ Device A
Trade-off:
- Shared subscriptions (
$share/group/topic): Load balancing across consumers - Routing overhead: Inter-node traffic (minimize with
local_onlytopics)
Telemetry Ingestion
High-Throughput Message Pipeline
Scenario: 100,000 devices x 1 message/sec = 100k msg/sec sustained load.
Architecture:
MQTT Broker (cluster)
β
βΌ
βββββββββββββββββββ
β Message Router β (Kafka topic: iot.telemetry)
β Kafka/Kinesis β - Partitions: 50
β β - Replication: 3
ββββββββββ¬βββββββββ - Retention: 7 days
β
ββββββββββββββββββββ¬βββββββββββββββββββ¬ββββββββββββββββββ
βΌ βΌ βΌ βΌ
ββββββββββ βββββββββββ ββββββββββββ ββββββββββββ
β Stream β β Time- β β Analyticsβ β Archival β
β Processβ β Series β β Engine β β (S3) β
β (Flink)β β Ingest β β (Druid) β β β
ββββββββββ βββββββββββ ββββββββββββ ββββββββββββ
MQTT β Kafka bridge:
package bridge
import (
"encoding/json"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/segmentio/kafka-go"
)
type MQTTKafkaBridge struct {
mqttClient mqtt.Client
kafkaWriter *kafka.Writer
}
func NewBridge(mqttBroker, kafkaBroker string) *MQTTKafkaBridge {
// Kafka writer with batching
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(kafkaBroker),
Topic: "iot.telemetry",
Balancer: &kafka.Hash{}, // Partition by device ID
BatchSize: 100, // Batch 100 messages
BatchTimeout: 10 * time.Millisecond,
Compression: kafka.Snappy,
Async: true, // Don't block MQTT
}
// MQTT client
opts := mqtt.NewClientOptions()
opts.AddBroker(mqttBroker)
opts.SetClientID("kafka-bridge")
opts.SetCleanSession(false)
opts.SetAutoReconnect(true)
opts.SetMaxReconnectInterval(60 * time.Second)
mqttClient := mqtt.NewClient(opts)
bridge := &MQTTKafkaBridge{
mqttClient: mqttClient,
kafkaWriter: kafkaWriter,
}
// Subscribe to all telemetry
mqttClient.Subscribe("telemetry/#", 0, bridge.onMessage)
return bridge
}
func (b *MQTTKafkaBridge) onMessage(client mqtt.Client, msg mqtt.Message) {
// Extract device ID from topic (telemetry/{device_id})
parts := strings.Split(msg.Topic(), "/")
if len(parts) != 2 {
return
}
deviceID := parts[1]
// Enrich with metadata
envelope := TelemetryEnvelope{
DeviceID: deviceID,
Timestamp: time.Now().UnixNano(),
Payload: msg.Payload(),
Topic: msg.Topic(),
}
bytes, _ := json.Marshal(envelope)
// Write to Kafka (async, batched)
b.kafkaWriter.WriteMessages(context.Background(),
kafka.Message{
Key: []byte(deviceID), // Partition by device ID
Value: bytes,
},
)
}
Backpressure Handling
Problem: Kafka cluster hiccup β messages buffer β OOM.
Solution: Circuit breaker + adaptive rate limiting.
type AdaptiveRateLimiter struct {
successCount int64
errorCount int64
rate int64 // Current allowed rate (msg/sec)
maxRate int64
lastAdjust time.Time
}
func (r *AdaptiveRateLimiter) Allow() bool {
currentRate := atomic.LoadInt64(&r.rate)
// Token bucket algorithm
if !r.bucket.Allow() {
return false
}
// Adjust rate every 10 seconds
if time.Since(r.lastAdjust) > 10*time.Second {
r.adjustRate()
}
return true
}
func (r *AdaptiveRateLimiter) adjustRate() {
success := atomic.LoadInt64(&r.successCount)
errors := atomic.LoadInt64(&r.errorCount)
total := success + errors
if total == 0 {
return
}
errorRate := float64(errors) / float64(total)
currentRate := atomic.LoadInt64(&r.rate)
var newRate int64
if errorRate > 0.05 {
// Error rate > 5%, reduce by 20%
newRate = int64(float64(currentRate) * 0.8)
} else if errorRate < 0.01 {
// Error rate < 1%, increase by 10%
newRate = int64(float64(currentRate) * 1.1)
if newRate > r.maxRate {
newRate = r.maxRate
}
} else {
newRate = currentRate
}
atomic.StoreInt64(&r.rate, newRate)
atomic.StoreInt64(&r.successCount, 0)
atomic.StoreInt64(&r.errorCount, 0)
r.lastAdjust = time.Now()
log.Printf("Adjusted rate: %d β %d msg/sec (error rate: %.2f%%)",
currentRate, newRate, errorRate*100)
}
// Integration with bridge
func (b *MQTTKafkaBridge) onMessage(client mqtt.Client, msg mqtt.Message) {
if !b.rateLimiter.Allow() {
// Drop message (or buffer to disk)
metrics.Increment("telemetry.dropped")
return
}
err := b.kafkaWriter.WriteMessages(...)
if err != nil {
b.rateLimiter.RecordError()
} else {
b.rateLimiter.RecordSuccess()
}
}
Protocol Comparison
| Feature | MQTT | HTTP | CoAP | AMQP |
|---|---|---|---|---|
| Transport | TCP | TCP | UDP | TCP |
| Overhead | 2 bytes | ~200 bytes | 4 bytes | ~8 bytes |
| QoS | 0/1/2 | None (app-level) | 0/1 (CON) | 0/1/2/3 |
| Pub/Sub | Native | Webhooks | Observe | Native |
| NAT/Firewall | Good (MQTT-SN for UDP) | Good | Poor (UDP) | Good |
| Battery | Excellent | Poor (HTTP/1.1) | Excellent | Good |
| Use Case | General IoT | Web-friendly | Constrained devices | Enterprise |
Code example - CoAP client:
from coapthon.client.helperclient import HelperClient
# CoAP uses UDP, very lightweight
client = HelperClient(server=('coap.example.com', 5683))
# Publish telemetry (CON = confirmable, like QoS 1)
response = client.post('sensors/temp',
payload='{"value": 23.5}',
timeout=5)
print(response.pretty_print())
# Observe (like MQTT subscribe)
client.observe('actuators/fan', callback=lambda r: print(r.payload))
Time-Series Data Storage
Storage Engine Comparison
InfluxDB (popular choice):
ββββββββββββββββββββββββββββββββββββββββ
β InfluxDB Architecture β
ββββββββββββββββββββββββββββββββββββββββ€
β WAL (Write-Ahead Log) β β Durability
ββββββββββββββββββββββββββββββββββββββββ€
β Cache (in-memory writes) β β Fast writes
ββββββββββββββββββββββββββββββββββββββββ€
β TSM Files (Time-Structured Merge) β β Columnar storage
β - data/ β
β - 00001.tsm (1h block) β
β - 00002.tsm (1h block) β
β - index/ β
β - tag index (inverted) β
ββββββββββββββββββββββββββββββββββββββββ
Schema design:
-- Measurement = table name
-- Tags = indexed metadata (low cardinality)
-- Fields = actual values (not indexed)
-- Timestamp = automatic
-- Example: Store temperature readings
-- DON'T: use device_id as field (can't filter efficiently)
-- DO: use device_id as tag
INSERT temperature,device_id=sensor123,room=bedroom value=23.5
-- Query by tag (fast, uses index)
SELECT value FROM temperature
WHERE device_id='sensor123'
AND time > now() - 1h
-- Aggregate
SELECT MEAN(value) FROM temperature
WHERE room='bedroom'
AND time > now() - 24h
GROUP BY time(10m), device_id
Cardinality explosion (common pitfall):
# BAD: High cardinality tag
# 1M devices Γ 1000 rooms = 1B tag combinations β slow
influx.write_points([{
"measurement": "temperature",
"tags": {
"device_id": "sensor12345", # 1M unique values
"room": "bedroom_floor2_apt501" # 1000 unique values
},
"fields": {"value": 23.5}
}])
# GOOD: Keep tags low cardinality, move high-cardinality to fields
influx.write_points([{
"measurement": "temperature",
"tags": {
"building_id": "building_42", # 100 buildings
"floor": "2" # 20 floors
},
"fields": {
"device_id": "sensor12345", # Field, not indexed
"value": 23.5
}
}])
Write optimization (batching):
package ingestion
import (
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/write"
)
type TelemetryWriter struct {
client influxdb2.Client
writeAPI api.WriteAPI
buffer []*write.Point
mu sync.Mutex
}
func NewTelemetryWriter(url, token, org, bucket string) *TelemetryWriter {
client := influxdb2.NewClient(url, token)
writeAPI := client.WriteAPI(org, bucket)
// Handle async errors
errorsCh := writeAPI.Errors()
go func() {
for err := range errorsCh {
log.Printf("InfluxDB write error: %v", err)
}
}()
return &TelemetryWriter{
client: client,
writeAPI: writeAPI,
buffer: make([]*write.Point, 0, 1000),
}
}
func (w *TelemetryWriter) WriteTelemetry(deviceID string, measurement string, value float64) {
p := influxdb2.NewPointWithMeasurement(measurement).
AddTag("device_id", deviceID).
AddField("value", value).
SetTime(time.Now())
// Async write (batched automatically by client)
w.writeAPI.WritePoint(p)
}
// For very high throughput, manual batching gives more control
func (w *TelemetryWriter) WriteBatch(points []*write.Point) error {
// Write in chunks of 5000 (InfluxDB recommended batch size)
const batchSize = 5000
for i := 0; i < len(points); i += batchSize {
end := i + batchSize
if end > len(points) {
end = len(points)
}
batch := points[i:end]
w.writeAPI.WritePoint(batch...)
}
// Flush to ensure all writes complete
w.writeAPI.Flush()
return nil
}
Retention & Downsampling
Problem: Raw data (1 sample/sec) = 86,400 points/day/device. 1M devices = 86B points/day β $$
Solution: Continuous queries for downsampling + retention policies.
-- Create retention policies
CREATE RETENTION POLICY "raw" ON "iot_db" DURATION 7d REPLICATION 1 DEFAULT
CREATE RETENTION POLICY "downsampled_1h" ON "iot_db" DURATION 90d REPLICATION 1
CREATE RETENTION POLICY "downsampled_1d" ON "iot_db" DURATION 5y REPLICATION 1
-- Continuous query: auto-downsample raw data to 1h averages
CREATE CONTINUOUS QUERY "cq_downsample_1h" ON "iot_db"
BEGIN
SELECT MEAN(value) AS value
INTO "downsampled_1h"."temperature"
FROM "raw"."temperature"
GROUP BY time(1h), device_id
END
-- Continuous query: 1d averages
CREATE CONTINUOUS QUERY "cq_downsample_1d" ON "iot_db"
BEGIN
SELECT MEAN(value) AS value
INTO "downsampled_1d"."temperature"
FROM "downsampled_1h"."temperature"
GROUP BY time(1d), device_id
END
Storage savings:
Raw (7 days): 1M devices Γ 86,400 points/day Γ 7d = 604B points
1h downsampled (90d): 1M devices Γ 24 points/day Γ 90d = 2.16B points
1d downsampled (5y): 1M devices Γ 1 point/day Γ 1825d = 1.83B points
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Total: 608B points (vs 31T points if storing raw for 5 years)
TimescaleDB Alternative
Postgres extension (better for relational queries):
-- Create hypertable (auto-partitioning by time)
CREATE TABLE telemetry (
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
metric TEXT NOT NULL,
value DOUBLE PRECISION
);
SELECT create_hypertable('telemetry', 'time');
-- Create index on device_id (for device-specific queries)
CREATE INDEX ON telemetry (device_id, time DESC);
-- Compression (10x reduction typical)
ALTER TABLE telemetry SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'device_id',
timescaledb.compress_orderby = 'time DESC'
);
-- Auto-compress chunks older than 7 days
SELECT add_compression_policy('telemetry', INTERVAL '7 days');
-- Continuous aggregate (like InfluxDB CQ)
CREATE MATERIALIZED VIEW telemetry_1h
WITH (timescaledb.continuous) AS
SELECT device_id,
time_bucket('1 hour', time) AS bucket,
AVG(value) as avg_value,
MAX(value) as max_value,
MIN(value) as min_value
FROM telemetry
GROUP BY device_id, bucket;
-- Refresh policy
SELECT add_continuous_aggregate_policy('telemetry_1h',
start_offset => INTERVAL '3 hours',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');
Query performance:
-- Query raw data (last hour)
EXPLAIN ANALYZE
SELECT time, value
FROM telemetry
WHERE device_id = 'sensor123'
AND time > NOW() - INTERVAL '1 hour';
-- Planning Time: 0.5ms
-- Execution Time: 12ms (3600 points)
-- Query aggregated data (last month)
EXPLAIN ANALYZE
SELECT bucket, avg_value
FROM telemetry_1h
WHERE device_id = 'sensor123'
AND bucket > NOW() - INTERVAL '30 days';
-- Planning Time: 0.3ms
-- Execution Time: 3ms (720 points, pre-aggregated)
Edge Computing
Edge Gateway Architecture
Why edge: Reduce bandwidth, lower latency, work offline.
Cloud
β²
β (aggregated data)
ββββββββββββ΄βββββββββββ
β β
Edge Gateway Edge Gateway
(Raspberry Pi) (Industrial PC)
β β
ββββββ΄βββββ ββββββ΄βββββ
β β β β
[Sensor] [Sensor] [PLC] [Camera]
(BLE) (Zigbee) (Modbus) (RTSP)
Edge processing pipeline:
# Running on edge gateway (e.g., Raspberry Pi 4)
import asyncio
from azure.iot.device.aio import IoTHubModuleClient
from collections import deque
import numpy as np
class EdgeTelemetryProcessor:
def __init__(self):
self.client = IoTHubModuleClient.create_from_edge_environment()
self.buffer = deque(maxlen=1000) # Local buffer
self.anomaly_detector = AnomalyDetector()
async def process_sensor_reading(self, device_id, value):
# Step 1: Local buffering (survive network outage)
self.buffer.append({
'device_id': device_id,
'value': value,
'timestamp': time.time()
})
# Step 2: Local anomaly detection
is_anomaly = self.anomaly_detector.detect(device_id, value)
if is_anomaly:
# Critical: send to cloud immediately
await self.send_to_cloud({
'device_id': device_id,
'value': value,
'alert': 'anomaly_detected',
'priority': 'high'
})
else:
# Normal: aggregate and batch
# Only send summary every 5 minutes
pass
async def flush_aggregates(self):
"""Send aggregated data to cloud every 5 minutes"""
while True:
await asyncio.sleep(300) # 5 minutes
# Aggregate by device
aggregates = {}
for reading in list(self.buffer):
device_id = reading['device_id']
if device_id not in aggregates:
aggregates[device_id] = []
aggregates[device_id].append(reading['value'])
# Send summary
for device_id, values in aggregates.items():
summary = {
'device_id': device_id,
'count': len(values),
'mean': np.mean(values),
'min': np.min(values),
'max': np.max(values),
'std': np.std(values)
}
await self.send_to_cloud(summary)
self.buffer.clear()
async def send_to_cloud(self, data):
try:
msg = Message(json.dumps(data))
await self.client.send_message_to_output(msg, "telemetryOutput")
except Exception as e:
# Network error, keep in buffer
log.error(f"Failed to send to cloud: {e}")
Bandwidth savings:
Without edge aggregation:
100 sensors Γ 1 sample/sec Γ 60 sec Γ 100 bytes = 600 KB/min β 864 MB/day
With edge aggregation:
100 sensors Γ 1 summary/5min Γ 200 bytes = 4 KB/5min β 1.15 MB/day
Savings: 99.87% bandwidth reduction
Edge-Cloud Sync
Azure IoT Edge (deployment.json):
{
"modulesContent": {
"$edgeAgent": {
"properties.desired": {
"modules": {
"telemetryProcessor": {
"version": "1.0",
"type": "docker",
"status": "running",
"restartPolicy": "always",
"settings": {
"image": "acr.io/telemetry-processor:latest",
"createOptions": {
"HostConfig": {
"Binds": ["/data:/data"]
}
}
}
},
"localDatabase": {
"version": "1.0",
"type": "docker",
"status": "running",
"settings": {
"image": "timescale/timescaledb:latest"
}
}
}
}
},
"$edgeHub": {
"properties.desired": {
"routes": {
"sensorToProcessor": "FROM /messages/modules/sensorModule/* INTO BrokeredEndpoint(\"/modules/telemetryProcessor/inputs/sensorInput\")",
"processorToCloud": "FROM /messages/modules/telemetryProcessor/outputs/telemetryOutput INTO $upstream"
},
"storeAndForwardConfiguration": {
"timeToLiveSecs": 604800
}
}
}
}
}
Offline-first design:
package edge
import (
"database/sql"
_ "github.com/mattn/go-sqlite3"
)
type OfflineBuffer struct {
db *sql.DB
}
func NewOfflineBuffer() *OfflineBuffer {
db, _ := sql.Open("sqlite3", "/data/offline_buffer.db")
// Create table for buffered messages
db.Exec(`CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp INTEGER,
payload BLOB,
retry_count INTEGER DEFAULT 0
)`)
return &OfflineBuffer{db: db}
}
func (b *OfflineBuffer) Store(payload []byte) error {
_, err := b.db.Exec(
"INSERT INTO messages (timestamp, payload) VALUES (?, ?)",
time.Now().Unix(), payload,
)
return err
}
func (b *OfflineBuffer) Flush(cloudClient CloudClient) error {
rows, err := b.db.Query(
"SELECT id, payload FROM messages ORDER BY timestamp LIMIT 100",
)
if err != nil {
return err
}
defer rows.Close()
var successIDs []int64
for rows.Next() {
var id int64
var payload []byte
rows.Scan(&id, &payload)
err := cloudClient.Send(payload)
if err == nil {
successIDs = append(successIDs, id)
} else {
// Increment retry count
b.db.Exec("UPDATE messages SET retry_count = retry_count + 1 WHERE id = ?", id)
}
}
// Delete successfully sent messages
if len(successIDs) > 0 {
placeholders := strings.Repeat("?,", len(successIDs)-1) + "?"
query := fmt.Sprintf("DELETE FROM messages WHERE id IN (%s)", placeholders)
args := make([]interface{}, len(successIDs))
for i, id := range successIDs {
args[i] = id
}
b.db.Exec(query, args...)
}
return nil
}
Device Twins / Shadows
Concept: Server-side representation of device state, accessible even when device offline.
Device (actual state) Cloud (desired state)
βββββββββββββββββββββ βββββββββββββββββββββ
{ {
"firmware": "1.2.0", "firmware": "1.3.0", β Cloud wants upgrade
"config": { "config": {
"interval": 60 "interval": 30 β Cloud wants faster reports
} }
} }
Device syncs:
1. Receives desired state from cloud
2. Applies changes (upgrade firmware, update config)
3. Reports new actual state
AWS IoT Device Shadow
Shadow document structure:
{
"state": {
"desired": {
"temperature_threshold": 25,
"reporting_interval": 30,
"led_brightness": 80
},
"reported": {
"temperature_threshold": 20,
"reporting_interval": 60,
"led_brightness": 50,
"firmware_version": "1.2.0",
"battery": 75
}
},
"metadata": {
"desired": {
"temperature_threshold": {
"timestamp": 1678901234
}
},
"reported": {
"battery": {
"timestamp": 1678901200
}
}
},
"version": 42,
"timestamp": 1678901234
}
Device-side implementation (Python):
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTShadowClient
class DeviceController:
def __init__(self, thing_name):
self.shadow_client = AWSIoTMQTTShadowClient("device123")
self.shadow_client.configureEndpoint("abc123.iot.us-west-2.amazonaws.com", 8883)
self.shadow_client.configureCredentials(
"root-ca.pem", "private.key", "certificate.pem"
)
self.shadow_client.connect()
self.device_shadow = self.shadow_client.createShadowHandlerWithName(thing_name, True)
# Listen for desired state changes
self.device_shadow.shadowRegisterDeltaCallback(self.on_delta)
# Initial sync
self.device_shadow.shadowGet(self.on_shadow, 5)
def on_delta(self, payload, responseStatus, token):
"""Called when cloud updates desired state"""
delta = json.loads(payload)
print(f"Received delta: {delta}")
# Apply changes
if "temperature_threshold" in delta["state"]:
new_threshold = delta["state"]["temperature_threshold"]
self.update_config("temp_threshold", new_threshold)
if "reporting_interval" in delta["state"]:
new_interval = delta["state"]["reporting_interval"]
self.update_config("interval", new_interval)
# Report back new state
self.report_state({
"temperature_threshold": new_threshold,
"reporting_interval": new_interval
})
def report_state(self, state):
"""Report current device state to cloud"""
payload = {
"state": {
"reported": state
}
}
self.device_shadow.shadowUpdate(json.dumps(payload), self.on_shadow_update, 5)
def on_shadow_update(self, payload, responseStatus, token):
if responseStatus == "accepted":
print("Shadow updated successfully")
else:
print(f"Shadow update failed: {responseStatus}")
Cloud-side: Update desired state (Go):
package shadow
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/iotdataplane"
)
type ShadowService struct {
iot *iotdataplane.IoTDataPlane
}
func (s *ShadowService) UpdateDesiredState(thingName string, desired map[string]interface{}) error {
payload := map[string]interface{}{
"state": map[string]interface{}{
"desired": desired,
},
}
jsonPayload, _ := json.Marshal(payload)
_, err := s.iot.UpdateThingShadow(&iotdataplane.UpdateThingShadowInput{
ThingName: aws.String(thingName),
Payload: jsonPayload,
})
return err
}
// Example: Remotely configure all devices in a building
func (s *ShadowService) ConfigureBuilding(buildingID string, config map[string]interface{}) error {
// Get all devices in building
devices, err := s.registry.GetDevicesByBuilding(buildingID)
if err != nil {
return err
}
// Update shadows in parallel
var wg sync.WaitGroup
errCh := make(chan error, len(devices))
for _, device := range devices {
wg.Add(1)
go func(deviceID string) {
defer wg.Done()
err := s.UpdateDesiredState(deviceID, config)
if err != nil {
errCh <- err
}
}(device.ID)
}
wg.Wait()
close(errCh)
// Check for errors
var errors []error
for err := range errCh {
errors = append(errors, err)
}
if len(errors) > 0 {
return fmt.Errorf("failed to update %d devices", len(errors))
}
return nil
}
Conflict Resolution
Problem: Device offline for 1 hour, during which cloud updates desired state 10 times. Device comes back online.
Strategy 1: Last-write-wins (LWW)
def resolve_conflict(reported, desired, metadata):
# Compare timestamps
if metadata["reported"]["timestamp"] > metadata["desired"]["timestamp"]:
# Device state is newer, keep reported
return reported
else:
# Cloud state is newer, apply desired
return desired
Strategy 2: Merge (for non-conflicting keys)
def merge_states(reported, desired):
merged = reported.copy()
for key, value in desired.items():
if key not in reported:
# New key from cloud, apply it
merged[key] = value
elif reported[key] != value:
# Conflict, cloud wins
merged[key] = value
return merged
Strategy 3: Vector clocks (advanced)
type VectorClock map[string]int
func (vc VectorClock) HappensBefore(other VectorClock) bool {
lessOrEqual := true
strictlyLess := false
for node, timestamp := range vc {
otherTimestamp := other[node]
if timestamp > otherTimestamp {
return false // Not happened before
}
if timestamp < otherTimestamp {
strictlyLess = true
}
}
return lessOrEqual && strictlyLess
}
// Usage in shadow
type DeviceShadow struct {
State map[string]interface{}
VectorClock VectorClock
}
func (s *DeviceShadow) Merge(other *DeviceShadow) {
if s.VectorClock.HappensBefore(other.VectorClock) {
// Other is newer, replace
s.State = other.State
s.VectorClock = other.VectorClock
} else if other.VectorClock.HappensBefore(s.VectorClock) {
// Self is newer, keep current
return
} else {
// Concurrent updates, manual merge required
s.State = mergeConflict(s.State, other.State)
s.VectorClock = mergeVectorClocks(s.VectorClock, other.VectorClock)
}
}
Security
Device Authentication
Certificate-based (X.509) - industry standard:
ββββββββββββββββ
β Root CA β (offline, stored in HSM)
ββββββββ¬ββββββββ
β
ββββββββΌββββββββ
β Intermediate β (signs device certs)
β CA β
ββββββββ¬ββββββββ
β
ββββββββββββ¬βββββββββββ¬βββββββββββ
βΌ βΌ βΌ βΌ
[Device A] [Device B] [Device C] [Device D]
(unique (unique (unique (unique
cert) cert) cert) cert)
Certificate generation (offline, during manufacturing):
#!/bin/bash
# Generate device certificate
DEVICE_ID=$1
# Generate private key (stays on device, never transmitted)
openssl genrsa -out ${DEVICE_ID}.key 2048
# Generate CSR (Certificate Signing Request)
openssl req -new -key ${DEVICE_ID}.key -out ${DEVICE_ID}.csr \
-subj "/CN=${DEVICE_ID}/O=MyCompany/C=US"
# Sign with intermediate CA
openssl x509 -req -in ${DEVICE_ID}.csr \
-CA intermediate-ca.crt -CAkey intermediate-ca.key \
-CAcreateserial -out ${DEVICE_ID}.crt \
-days 3650 -sha256
# Embed cert + root CA in device firmware
cat ${DEVICE_ID}.crt intermediate-ca.crt root-ca.crt > ${DEVICE_ID}-chain.pem
Device-side TLS connection (C):
#include <mbedtls/ssl.h>
#include <mbedtls/net_sockets.h>
#include <mbedtls/entropy.h>
#include <mbedtls/ctr_drbg.h>
int connect_to_broker() {
mbedtls_ssl_context ssl;
mbedtls_ssl_config conf;
mbedtls_x509_crt cacert, clicert;
mbedtls_pk_context pkey;
// Load device certificate and private key
mbedtls_x509_crt_parse_file(&clicert, "/certs/device.crt");
mbedtls_pk_parse_keyfile(&pkey, "/certs/device.key", NULL);
// Load root CA
mbedtls_x509_crt_parse_file(&cacert, "/certs/root-ca.crt");
// Configure SSL
mbedtls_ssl_config_defaults(&conf,
MBEDTLS_SSL_IS_CLIENT,
MBEDTLS_SSL_TRANSPORT_STREAM,
MBEDTLS_SSL_PRESET_DEFAULT);
mbedtls_ssl_conf_ca_chain(&conf, &cacert, NULL);
mbedtls_ssl_conf_own_cert(&conf, &clicert, &pkey);
// Verify server certificate
mbedtls_ssl_conf_authmode(&conf, MBEDTLS_SSL_VERIFY_REQUIRED);
// Connect
mbedtls_net_context server_fd;
mbedtls_net_connect(&server_fd, "mqtt.example.com", "8883", MBEDTLS_NET_PROTO_TCP);
mbedtls_ssl_setup(&ssl, &conf);
mbedtls_ssl_set_bio(&ssl, &server_fd, mbedtls_net_send, mbedtls_net_recv, NULL);
// TLS handshake
int ret = mbedtls_ssl_handshake(&ssl);
if (ret != 0) {
printf("TLS handshake failed: -0x%x\n", -ret);
return -1;
}
// Verify peer certificate
uint32_t flags = mbedtls_ssl_get_verify_result(&ssl);
if (flags != 0) {
printf("Certificate verification failed\n");
return -1;
}
printf("TLS connection established\n");
return 0;
}
Secure Boot
Purpose: Prevent malicious firmware from running.
ββββββββββββββββββββββββββββββββββββββββββ
β Secure Boot Chain β
ββββββββββββββββββββββββββββββββββββββββββ€
β 1. ROM Bootloader (immutable) β
β - Verify Stage 2 bootloader sig β
β - Public key burned in OTP fuses β
ββββββββββββββββββββββββββββββββββββββββββ€
β 2. Stage 2 Bootloader β
β - Verify app firmware signature β
β - Check rollback protection β
ββββββββββββββββββββββββββββββββββββββββββ€
β 3. Application Firmware β
β - Run only if signature valid β
ββββββββββββββββββββββββββββββββββββββββββ
ESP32 secure boot (example):
// Bootloader verification (in Stage 2)
#include "esp_secure_boot.h"
esp_err_t verify_and_boot() {
const esp_partition_t *partition = esp_partition_find_first(
ESP_PARTITION_TYPE_APP, ESP_PARTITION_SUBTYPE_ANY, NULL);
// Read app image
esp_image_metadata_t metadata;
esp_err_t err = esp_image_verify(ESP_IMAGE_VERIFY,
partition->address,
&metadata);
if (err != ESP_OK) {
// Signature verification failed
ESP_LOGE("boot", "Invalid firmware signature");
// Try backup partition
partition = esp_partition_find_first(
ESP_PARTITION_TYPE_APP, ESP_PARTITION_SUBTYPE_APP_OTA_1, NULL);
err = esp_image_verify(ESP_IMAGE_VERIFY,
partition->address,
&metadata);
if (err != ESP_OK) {
// Both partitions invalid, halt
esp_restart();
}
}
// Check rollback version (prevent downgrade attacks)
if (metadata.secure_version < get_secure_version()) {
ESP_LOGE("boot", "Firmware version too old (rollback protection)");
esp_restart();
}
// Boot into verified app
esp_image_load(partition->address);
return ESP_OK;
}
Encryption at Rest
Problem: Device stolen, attacker extracts flash memory.
Solution: Flash encryption (hardware-accelerated):
// Enable flash encryption (ESP32)
esp_err_t enable_flash_encryption() {
// Generate 256-bit AES key in eFuse (one-time programmable)
esp_flash_encryption_init();
// Encrypt all partitions
const esp_partition_t *partition = esp_partition_find_first(
ESP_PARTITION_TYPE_APP, ESP_PARTITION_SUBTYPE_ANY, NULL);
esp_flash_encrypt_region(partition->address, partition->size);
// Lock eFuse (key cannot be read out)
esp_efuse_write_key(EFUSE_BLK_KEY0, ESP_EFUSE_KEY_PURPOSE_FLASH_ENCRYPTION);
return ESP_OK;
}
// Data is automatically decrypted on read, encrypted on write
// Attacker reading raw flash gets only encrypted data
Cloud-side: Encryption at rest (AWS IoT):
// Store device credentials encrypted
import (
"github.com/aws/aws-sdk-go/service/kms"
)
func storeDeviceSecret(deviceID, secret string) error {
// Encrypt with KMS
kmsClient := kms.New(session.Must(session.NewSession()))
encrypted, err := kmsClient.Encrypt(&kms.EncryptInput{
KeyId: aws.String("arn:aws:kms:us-west-2:123456789:key/abcd-1234"),
Plaintext: []byte(secret),
EncryptionContext: map[string]*string{
"device_id": aws.String(deviceID),
},
})
if err != nil {
return err
}
// Store encrypted blob in DynamoDB
return dynamoDB.PutItem(&dynamodb.PutItemInput{
TableName: aws.String("device_secrets"),
Item: map[string]*dynamodb.AttributeValue{
"device_id": {S: aws.String(deviceID)},
"encrypted_secret": {B: encrypted.CiphertextBlob},
},
})
}
Analytics & Anomaly Detection
Stream Processing for Real-time Alerts
Apache Flink pipeline:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Read from Kafka
FlinkKafkaConsumer<TelemetryEvent> consumer = new FlinkKafkaConsumer<>(
"iot.telemetry",
new TelemetrySchema(),
properties
);
DataStream<TelemetryEvent> telemetry = env.addSource(consumer);
// Detect temperature anomalies
DataStream<Alert> alerts = telemetry
.keyBy(event -> event.getDeviceId())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new AnomalyDetector());
// Send alerts
alerts.addSink(new AlertSink());
env.execute("IoT Anomaly Detection");
Anomaly detector (statistical approach):
public class AnomalyDetector extends ProcessWindowFunction<TelemetryEvent, Alert, String, TimeWindow> {
@Override
public void process(String deviceId, Context context, Iterable<TelemetryEvent> events, Collector<Alert> out) {
List<Double> values = new ArrayList<>();
for (TelemetryEvent event : events) {
values.add(event.getValue());
}
// Calculate statistics
double mean = calculateMean(values);
double stdDev = calculateStdDev(values, mean);
// Check for outliers (Z-score > 3)
for (TelemetryEvent event : events) {
double zScore = Math.abs((event.getValue() - mean) / stdDev);
if (zScore > 3.0) {
out.collect(new Alert(
deviceId,
event.getTimestamp(),
event.getValue(),
"Statistical anomaly detected (Z-score: " + zScore + ")"
));
}
}
}
}
Machine Learning for Predictive Maintenance
Use case: Dα»± ΔoΓ‘n motor failure dα»±a trΓͺn vibration patterns.
Training pipeline (Python + TensorFlow):
import tensorflow as tf
from tensorflow import keras
import pandas as pd
import numpy as np
# Load historical data (labeled: normal vs failure)
df = pd.read_csv('motor_vibration_data.csv')
# Columns: device_id, timestamp, vibration_x, vibration_y, vibration_z, temperature, label
# Feature engineering
def extract_features(window):
"""Extract features from 1-minute window"""
return {
'mean_vibration': np.mean(window[['vibration_x', 'vibration_y', 'vibration_z']].values),
'std_vibration': np.std(window[['vibration_x', 'vibration_y', 'vibration_z']].values),
'max_vibration': np.max(window[['vibration_x', 'vibration_y', 'vibration_z']].values),
'fft_peak': get_fft_peak_frequency(window['vibration_x'].values),
'temperature': np.mean(window['temperature'].values),
}
# Build LSTM model
model = keras.Sequential([
keras.layers.LSTM(64, input_shape=(60, 5), return_sequences=True), # 60 timesteps
keras.layers.Dropout(0.2),
keras.layers.LSTM(32),
keras.layers.Dropout(0.2),
keras.layers.Dense(16, activation='relu'),
keras.layers.Dense(1, activation='sigmoid') # Failure probability
])
model.compile(optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy', keras.metrics.AUC()])
# Train
history = model.fit(X_train, y_train,
epochs=50,
batch_size=32,
validation_data=(X_val, y_val))
# Export model
model.save('motor_failure_predictor.h5')
Inference at edge (TensorFlow Lite for microcontrollers):
# Convert to TFLite
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()
# Deploy to edge device (C++)
# ...
Or inference in cloud (real-time):
from kafka import KafkaConsumer
import tensorflow as tf
model = tf.keras.models.load_model('motor_failure_predictor.h5')
consumer = KafkaConsumer('iot.telemetry', bootstrap_servers='localhost:9092')
for message in consumer:
telemetry = json.loads(message.value)
# Buffering: wait for 60 samples (1 minute @ 1Hz)
buffer.append(telemetry)
if len(buffer) < 60:
continue
# Extract features
features = extract_features(buffer)
X = np.array([features]).reshape(1, 60, 5)
# Predict
failure_probability = model.predict(X)[0][0]
if failure_probability > 0.8:
# High risk, send alert
send_alert(telemetry['device_id'], f"Predicted failure risk: {failure_probability:.2%}")
buffer = buffer[1:] # Sliding window
Scalability & Cost Optimization
Sharding Strategies
Problem: 10M devices, single MQTT broker can't handle.
Solution: Shard by device ID.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Load Balancer (HAProxy) β
β - Read device_id from MQTT CONNECT packet β
β - Hash(device_id) % num_shards β shard β
ββββββββββββββ¬βββββββββββββββ¬ββββββββββββββ¬βββββββββββ
β β β
βΌ βΌ βΌ
ββββββββββββ ββββββββββββ ββββββββββββ
β Shard 1 β β Shard 2 β β Shard 3 β
β (VerneMQ)β β (VerneMQ)β β (VerneMQ)β
ββββββββββββ ββββββββββββ ββββββββββββ
device_id device_id device_id
% 3 == 0 % 3 == 1 % 3 == 2
HAProxy config:
global
log /dev/log local0
maxconn 1000000
frontend mqtt_front
bind *:1883
mode tcp
option tcplog
# Extract device_id from MQTT CONNECT
tcp-request inspect-delay 5s
tcp-request content accept if { req.len gt 0 }
# Hash-based backend selection
use_backend shard_%[req.payload(0,0),mqtt_field_value(client_id),mod(3)]
backend shard_0
mode tcp
balance leastconn
server mqtt1 10.0.1.10:1883 check
backend shard_1
mode tcp
balance leastconn
server mqtt2 10.0.1.11:1883 check
backend shard_2
mode tcp
balance leastconn
server mqtt3 10.0.1.12:1883 check
Resharding (inevitable as you scale):
# Gradual resharding to minimize disruption
class ReshardController:
def __init__(self, old_shards, new_shards):
self.old_shards = old_shards
self.new_shards = new_shards
self.migration_percentage = 0
def get_shard(self, device_id):
# During migration, probabilistically route to new shards
if random.random() < self.migration_percentage:
return hash(device_id) % self.new_shards
else:
return hash(device_id) % self.old_shards
def increase_migration(self, percentage):
"""Gradually increase traffic to new shards"""
self.migration_percentage = min(1.0, percentage)
# Update load balancer config
self.update_haproxy_config()
# Migration plan:
# Day 1: 0% β new shards (deploy new shards)
# Day 2: 10% β new shards
# Day 3: 30% β new shards
# Day 4: 50% β new shards
# Day 5: 100% β new shards (decommission old shards)
Cold Storage & Data Lifecycle
Tiered storage:
Hot Tier (0-7 days): InfluxDB (SSD) - Full-resolution
Warm Tier (7-90 days): S3 + Athena - 1h aggregates
Cold Tier (90d-5y): S3 Glacier - 1d aggregates
Archive (>5y): Delete or tape backup - Regulatory only
Auto-archival job:
package archival
import (
"github.com/aws/aws-sdk-go/service/s3"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)
type ArchivalJob struct {
influx influxdb2.Client
s3 *s3.S3
}
func (j *ArchivalJob) ArchiveOldData() error {
// Query data older than 7 days
query := `
from(bucket: "iot_telemetry")
|> range(start: -14d, stop: -7d)
|> aggregateWindow(every: 1h, fn: mean)
`
result, err := j.influx.QueryAPI("myorg").Query(context.Background(), query)
if err != nil {
return err
}
// Write to Parquet file
parquetFile := createParquetWriter("telemetry_archive.parquet")
for result.Next() {
record := result.Record()
parquetFile.Write(record)
}
parquetFile.Close()
// Upload to S3
file, _ := os.Open("telemetry_archive.parquet")
_, err = j.s3.PutObject(&s3.PutObjectInput{
Bucket: aws.String("iot-archive"),
Key: aws.String(fmt.Sprintf("year=%d/month=%d/telemetry.parquet",
time.Now().Year(), time.Now().Month())),
Body: file,
StorageClass: aws.String("GLACIER"), // Cheap long-term storage
})
// Delete from InfluxDB
if err == nil {
deleteQuery := `
from(bucket: "iot_telemetry")
|> range(start: -14d, stop: -7d)
|> delete()
`
j.influx.QueryAPI("myorg").Query(context.Background(), deleteQuery)
}
return err
}
Query archived data (Athena):
-- Create external table on S3
CREATE EXTERNAL TABLE telemetry_archive (
device_id STRING,
timestamp BIGINT,
value DOUBLE
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET
LOCATION 's3://iot-archive/';
-- Query (billed by data scanned, cheap if partitioned well)
SELECT device_id, AVG(value) as avg_value
FROM telemetry_archive
WHERE year = 2025 AND month = 3
AND device_id = 'sensor123'
GROUP BY device_id;
Cost Optimization Techniques
1. Protocol efficiency:
HTTP POST: ~500 bytes overhead (headers)
MQTT Publish: ~2 bytes overhead (minimal header)
1M devices Γ 1 msg/min Γ 500 bytes Γ 60 min Γ 24h Γ 30d = 21.6 TB/month (HTTP)
1M devices Γ 1 msg/min Γ 2 bytes Γ 60 min Γ 24h Γ 30d = 86 GB/month (MQTT)
Data transfer cost (AWS): $0.09/GB
HTTP: $1,944/month
MQTT: $7.74/month
Savings: 99.6%
2. Compression:
import zlib
import json
# Raw telemetry
data = {
"device_id": "sensor123",
"timestamp": 1678901234,
"temperature": 23.5,
"humidity": 65.2,
"pressure": 1013.25
}
raw = json.dumps(data).encode()
print(f"Raw: {len(raw)} bytes") # ~120 bytes
# Compressed
compressed = zlib.compress(raw)
print(f"Compressed: {len(compressed)} bytes") # ~80 bytes
print(f"Ratio: {len(compressed)/len(raw):.1%}") # 66.7%
# For time-series, binary formats even better
import struct
binary = struct.pack('>Qfff',
data['timestamp'],
data['temperature'],
data['humidity'],
data['pressure'])
print(f"Binary: {len(binary)} bytes") # 20 bytes β 83% reduction
3. Batching:
# Instead of sending 60 individual messages (1/min for 1 hour)
# Send 1 batch message every hour
# Individual messages
for i in range(60):
client.publish(topic, json.dumps({"value": values[i]}))
# Cost: 60 Γ $0.000001/msg = $0.00006
# Batched
batch = {"values": values, "interval": 60}
client.publish(topic, json.dumps(batch))
# Cost: 1 Γ $0.000001/msg = $0.000001
# Savings: 98.3%
4. Sampling:
# Adaptive sampling: send more data when changing, less when stable
class AdaptiveSampler:
def __init__(self, threshold=0.5):
self.last_value = None
self.threshold = threshold
def should_send(self, value):
if self.last_value is None:
self.last_value = value
return True
# Send if change > threshold
if abs(value - self.last_value) > self.threshold:
self.last_value = value
return True
return False
# Example: Temperature stable at 23Β°C for 1 hour
sampler = AdaptiveSampler(threshold=0.5)
for temp in [23.0, 23.1, 23.0, 23.2, ..., 23.1]: # 60 readings
if sampler.should_send(temp):
client.publish("temp", str(temp))
# Result: Only 2-3 messages sent (when temp changes > 0.5Β°C)
# Instead of 60 messages
# Savings: 95%
Interview Questions
Q1: Design an IoT platform for 10 million smart thermostats
Requirements:
- 10M devices, each sends temperature every 60 seconds
- Users can view current temp and historical graphs
- Users can set target temperature remotely
- 99.9% uptime
Calculation:
Traffic:
- Telemetry: 10M devices Γ 1 msg/60s = 166k msg/sec
- Commands: ~1k/sec (users adjusting temp)
- Queries: ~10k/sec (users viewing dashboards)
Data:
- Per message: 50 bytes (device_id, timestamp, temp)
- Per day: 166k msg/sec Γ 50 bytes Γ 86400 sec = 718 GB/day
- Per year: 262 TB/year
Architecture:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Devices (10M) β
βββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββ
β MQTT/TLS
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β MQTT Broker Cluster (VerneMQ, 100 nodes, sharded) β
β - 100k connections/node β
β - QoS 0 for telemetry (fire-and-forget) β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Kafka (50 partitions, 3x replication) β
β - Topic: telemetry (retention: 7 days) β
β - Topic: commands (retention: 30 days) β
ββββββββ¬βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββ
β β
βΌ βΌ
βββββββββββββββ ββββββββββββββββββββββββββ
β Stream β β Device Shadow Serviceβ
β Processor β β (Redis, device state)β
β (Flink) β βββββββββββββ¬βββββββββββββ
β - Anomaly β β
β - Alerts β β
ββββββββ¬βββββββ β
β β
βΌ βΌ
ββββββββββββββββββ βββββββββββββββββββ
β InfluxDB β β PostgreSQL β
β (time-series) β β (metadata, β
β - Raw: 7d β β users, auth) β
β - 1h: 90d β βββββββββββββββββββ
β - 1d: 5y β
ββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β API Gateway (GraphQL/REST) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
[Web/Mobile App]
Key decisions:
- MQTT QoS 0: Temperature readings, one lost sample khΓ΄ng critical
- Sharding: 100 MQTT brokers Γ 100k connections = 10M
- Kafka: Decouple ingestion tα»« processing, replay-able
- Redis for shadows: Fast reads (current temp), TTL for offline devices
- InfluxDB downsampling: Raw data expensive, downsample sau 7 days
Bottlenecks & scaling:
- MQTT broker: Horizontal scaling vα»i sharding
- Kafka: Increase partitions (50 β 100)
- InfluxDB: Clustering (Enterprise) hoαΊ·c vΓ o TimescaleDB
- API: Stateless, scale horizontally
Q2: How do you handle firmware OTA for 1M devices without bricking them?
Strategy:
1. Canary rollout (1% β 10% β 100%)
2. Health checks (post-update boot verification)
3. Automatic rollback (bootloader dual-partition)
4. Gradual rollout with kill switch
Implementation:
class OTAController:
def rollout_firmware(self, version, target_devices):
# Phase 1: Canary (1%, 10,000 devices)
canary_devices = sample(target_devices, 10000)
self.update_devices(canary_devices, version)
# Monitor for 24 hours
time.sleep(86400)
# Check failure rate
failures = self.get_failure_count(version)
failure_rate = failures / 10000
if failure_rate > 0.05: # 5% failure threshold
# Abort rollout
self.pause_rollout(version)
self.alert_oncall(f"High failure rate: {failure_rate:.1%}")
return
# Phase 2: Staged (10%, 100k devices)
staged_devices = sample(target_devices, 100000)
self.update_devices(staged_devices, version)
time.sleep(43200) # Monitor for 12 hours
failures = self.get_failure_count(version)
failure_rate = failures / 100000
if failure_rate > 0.02:
self.pause_rollout(version)
return
# Phase 3: General availability (remaining 890k devices)
# Gradual rollout over 7 days to avoid thundering herd
remaining = list(set(target_devices) - set(staged_devices))
for day in range(7):
batch = remaining[day * 127142 : (day+1) * 127142]
self.update_devices(batch, version)
time.sleep(86400)
Device-side health check:
void app_main() {
// Check if this is first boot after OTA
esp_ota_img_states_t state;
const esp_partition_t *running = esp_ota_get_running_partition();
esp_ota_get_state_partition(running, &state);
if (state == ESP_OTA_IMG_PENDING_VERIFY) {
// Perform health checks
bool wifi_ok = test_wifi_connection();
bool mqtt_ok = test_mqtt_connection();
bool sensors_ok = test_all_sensors();
if (wifi_ok && mqtt_ok && sensors_ok) {
// Mark firmware as valid
esp_ota_mark_app_valid_cancel_rollback();
report_ota_success();
} else {
// Health check failed, reboot to rollback
ESP_LOGE("OTA", "Health check failed, rolling back");
esp_restart(); // Bootloader will boot previous partition
}
}
// Normal operation...
}
Q3: Optimize cost: 1M devices sending 1 msg/min costs $50k/month. Reduce to $5k.
Current cost breakdown:
Assumptions:
- 1M devices Γ 60 msg/hour Γ 24h = 1.44B msg/day
- MQTT broker: AWS IoT Core = $1 per 1M messages
- Data transfer: $0.09/GB
- Storage: InfluxDB Cloud = $0.50/GB-month
Costs:
- Messages: 1.44B msg/day Γ 30d Γ $1/1M = $43,200/month
- Data transfer: 1.44B msg Γ 100 bytes Γ $0.09/GB = $12,960/month
- Storage: 4.32 TB Γ $0.50 = $2,160/month
Total: $58,320/month
Optimization 1: Edge aggregation (reduce messages by 60x)
# Instead of 60 individual messages/hour, send 1 aggregate
class EdgeAggregator:
def __init__(self):
self.buffer = []
def add_reading(self, value):
self.buffer.append(value)
if len(self.buffer) >= 60: # 1 hour of data
self.send_aggregate()
def send_aggregate(self):
msg = {
"count": len(self.buffer),
"mean": np.mean(self.buffer),
"min": np.min(self.buffer),
"max": np.max(self.buffer)
}
client.publish("telemetry", json.dumps(msg))
self.buffer = []
# New cost:
# Messages: 1.44B / 60 = 24M msg/day Γ 30d Γ $1/1M = $720/month
# Savings: $42,480/month
Optimization 2: Compression (reduce data transfer by 70%)
import zlib
compressed_msg = zlib.compress(json.dumps(msg).encode())
# New cost:
# Data transfer: $12,960 Γ 0.3 = $3,888/month
# Savings: $9,072/month
Optimization 3: Self-hosted MQTT broker (eliminate per-message cost)
Deploy VerneMQ on EC2:
- 10Γ c5.2xlarge instances ($0.34/hour Γ 10 Γ 730h = $2,482/month)
- Handle 1M concurrent connections
- Zero per-message cost
Savings: $43,200 - $2,482 = $40,718/month
Optimization 4: Downsampling & cold storage
-- Keep raw data for 7 days only
CREATE RETENTION POLICY "raw" ON "iot_db" DURATION 7d REPLICATION 1 DEFAULT
-- 1h aggregates for 90 days (60x size reduction)
-- 1d aggregates for 5 years (1440x size reduction)
-- New storage:
Raw (7d): 4.32 TB / 30 Γ 7 = 1.01 TB
1h (90d): 1.01 TB / 60 Γ 90/7 = 0.22 TB
1d (5y): 0.22 TB / 24 Γ 1825/90 = 0.19 TB
Total: 1.42 TB
Cost: 1.42 TB Γ $0.50 = $710/month
Savings: $1,450/month
Total optimized cost:
Messages: $720/month (edge aggregation)
Data transfer: $3,888/month (compression)
Broker: $2,482/month (self-hosted)
Storage: $710/month (downsampling)
ββββββββββββββββββββββββββββββββββββ
Total: $7,800/month
Original: $58,320/month
Optimized: $7,800/month
Savings: 86.6% ($50,520/month)
TΓ³m tαΊ―t
Δiα»m quan trα»ng cαΊ§n nhα»:
- Scale = Sharding: KhΓ΄ng thα» scale vertically, phαΊ£i shard MQTT brokers, databases
- Constrained devices: Battery, bandwidth, memory β optimize protocols (MQTT > HTTP), compression, edge processing
- Reliability: QoS levels, device shadows, offline buffering, dual-partition OTA
- Security: Certificate-based auth, TLS, secure boot, flash encryption
- Cost: Edge aggregation, compression, batching, downsampling, cold storage
- Time-series DB: InfluxDB/TimescaleDB specialized cho IoT workloads
- Real-time: Stream processing (Flink/Kafka) cho alerts vΓ analytics
- Edge computing: Reduce bandwidth (99%), lower latency, offline-capable
Common pitfalls:
- Cardinality explosion: Sα» dα»₯ng high-cardinality values nhΖ° tags trong InfluxDB
- Thundering herd: TαΊ₯t cαΊ£ devices reconnect cΓΉng lΓΊc sau network outage
- No rollback: OTA updates khΓ΄ng cΓ³ fallback β brick devices
- Stateful brokers: MαΊ₯t connection state khi broker restart
- Ignoring offline: Devices sαΊ½ offline, phαΊ£i cΓ³ offline buffering
Trade-offs to discuss in interviews:
| Decision | Option A | Option B | When to use A | When to use B |
|---|---|---|---|---|
| Protocol | MQTT | HTTP | Constrained devices, constant connection | Web-friendly, firewall traversal |
| QoS | 0 (at most once) | 1 (at least once) | Telemetry (lossy OK) | Commands (must arrive) |
| Storage | InfluxDB | TimescaleDB | Pure time-series | Need relational joins |
| Processing | Edge | Cloud | Low latency, bandwidth limited | Complex analytics, ML |
| Sharding | By device ID | By geography | Even distribution | Data locality |
TΓ i liα»u tham khαΊ£o
Papers & specs:
Books:
- "Designing Data-Intensive Applications" - Martin Kleppmann (Chapter 11: Stream Processing)
- "Building the Internet of Things" - Maciej Kranz
- "IoT Inc" - Bruce Sinclair
Open-source projects:
- VerneMQ - Scalable MQTT broker
- Eclipse Mosquitto - Lightweight MQTT broker
- Apache IoTDB - Time-series database cho IoT
Blogs:
- HiveMQ Blog - MQTT deep dives
- InfluxData Blog - Time-series best practices
- AWS IoT Blog
Video courses:
Communities:
- MQTT Community
- r/IOT - Reddit IoT community
- IoT Stack Exchange