🏭 Domains✍️ KhoaπŸ“… 19/04/2026β˜• 38 phΓΊt đọc

IoT Platform - System Design cho Senior Engineers

Giα»›i thiệu

NαΊΏu bαΊ‘n Δ‘ang thiαΊΏt kαΊΏ mα»™t IoT platform, bαΊ‘n Δ‘ang Δ‘α»‘i mαΊ·t vα»›i mα»™t trong nhα»―ng bΓ i toΓ‘n distributed systems khΓ³ nhαΊ₯t: quαΊ£n lΓ½ hΓ ng triệu (hoαΊ·c tα»·) thiαΊΏt bα»‹ vα»›i hardware hαΊ‘n chαΊΏ, kαΊΏt nα»‘i khΓ΄ng α»•n Δ‘α»‹nh, vΓ  yΓͺu cαΊ§u xα»­ lΓ½ real-time vα»›i Δ‘α»™ trα»… thαΊ₯p.

ThΓ‘ch thα»©c Δ‘αΊ·c trΖ°ng cα»§a IoT

Traditional Web API          vs          IoT Platform
─────────────────                        ─────────────
β€’ 1M users/day                          β€’ 100M devices always-on
β€’ Rich clients (browser/mobile)          β€’ Constrained devices (256KB RAM)
β€’ Stable connectivity                    β€’ Intermittent network (2G/3G)
β€’ Request-response model                 β€’ Push + bidirectional
β€’ Stateless architecture                 β€’ Stateful (device shadows)
β€’ Human-generated data                   β€’ High-frequency telemetry

Key challenges:

  • Scale: Billions of devices, millions of concurrent connections
  • Hardware constraints: 256KB RAM, 32KB flash, 8-bit CPU
  • Connectivity: Intermittent, high latency (satellite), low bandwidth
  • Power: Battery-powered devices (years of lifetime)
  • Security: Physical access to devices, firmware vulnerabilities
  • Heterogeneity: Protocols (MQTT, CoAP, HTTP), data formats, vendors

IoT Platform Architecture (High-Level)

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                        IoT Devices Layer                        β”‚
β”‚  [Sensors] [Actuators] [Edge Gateways] [Industrial Equipment]  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
             β”‚ MQTT/CoAP/HTTP             β”‚ TLS/DTLS
             β–Ό                            β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Protocol Gateway     β”‚    β”‚   Device Gateway     β”‚
β”‚  (MQTT Broker Cluster) β”‚    β”‚  (Load Balancer)     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚                               β”‚
         β–Ό                               β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Message Router / Event Hub             β”‚
β”‚        (Kafka / Kinesis / Azure Event Hub)          β”‚
β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
      β”‚              β”‚              β”‚
      β–Ό              β–Ό              β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Telemetryβ”‚  β”‚   Command   β”‚  β”‚   Device    β”‚
β”‚ Ingestionβ”‚  β”‚   Service   β”‚  β”‚   Registry  β”‚
β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
     β”‚               β”‚                β”‚
     β–Ό               β–Ό                β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Time-Seriesβ”‚  β”‚  Device  β”‚  β”‚   Postgres  β”‚
β”‚ DB (Influx)β”‚  β”‚  Shadows β”‚  β”‚  (metadata) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Device Management

Device Provisioning at Scale

Challenge: LΓ m sao provision 1 triệu devices mΓ  khΓ΄ng phαΊ£i manually configure tα»«ng cΓ‘i?

Strategies:

  1. Zero-Touch Provisioning (ZTP)
Device Boot β†’ DHCP β†’ Download config from server β†’ Authenticate β†’ Activate
  1. Certificate-based Auto-enrollment (nhΖ° AWS IoT Just-in-Time Registration)
# Device-side: Generate CSR and request certificate
import ssl
import paho.mqtt.client as mqtt

def provision_device():
    # Step 1: Generate key pair (done once in factory)
    device_id = get_hardware_id()  # MAC address, serial number
    
    # Step 2: Connect with claim certificate (embedded in firmware)
    client = mqtt.Client(client_id=device_id)
    client.tls_set(
        ca_certs="root-ca.pem",
        certfile="claim-cert.pem",  # Factory-issued
        keyfile="claim-key.pem"
    )
    
    # Step 3: Publish provisioning request
    client.connect("provisioning.iot.example.com", 8883)
    client.publish(
        f"$aws/certificates/create-from-csr/json",
        payload=json.dumps({
            "csr": generate_csr(device_id),
            "template": "production-template"
        })
    )
    
    # Step 4: Receive unique certificate
    # Server validates claim cert β†’ issues device cert β†’ stores in registry

# Platform-side: Auto-registration Lambda
def on_certificate_created(event):
    cert_id = event['certificateId']
    device_id = extract_cn(event['csr'])
    
    # Validate against whitelist (factory manifest)
    if not is_authorized(device_id):
        revoke_certificate(cert_id)
        return
    
    # Create device in registry
    create_device(device_id, cert_id)
    
    # Attach policy (publish/subscribe permissions)
    attach_policy(cert_id, "device-policy")
    
    # Initialize device shadow
    create_shadow(device_id, {"state": {"desired": {}}})

Trade-offs:

  • Claim cert: Simpler, nhΖ°ng nαΊΏu leak thΓ¬ attacker cΓ³ thể provision rogue devices
  • Pre-registration: Secure hΖ‘n (whitelist serial numbers), nhΖ°ng cαΊ§n manual upload manifest
  • Public Key Infrastructure (PKI): Scalable nhαΊ₯t, nhΖ°ng complex (CA hierarchy, CRL/OCSP)

Firmware Over-the-Air (OTA) Updates

Scenario: BαΊ‘n cαΊ§n patch security vulnerability trΓͺn 10 triệu devices Δ‘ang chαΊ‘y ở production.

Architecture:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Build CI/CD │──────▢│  FW Registry │──────▢│   CDN/S3     β”‚
β”‚  (signed fw) β”‚       β”‚ (metadata DB)β”‚       β”‚ (binary blob)β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                              β”‚
                              β–Ό
                       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                       β”‚ OTA Schedulerβ”‚
                       β”‚ (canary/     β”‚
                       β”‚  rollout)    β”‚
                       β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
                              β”‚
                β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                β–Ό             β–Ό             β–Ό
           [Device A]    [Device B]    [Device C]
         (health check) (download)    (apply + verify)

Production-grade OTA Service (Go):

package ota

import (
    "context"
    "crypto/sha256"
    "encoding/hex"
    "fmt"
)

type FirmwareMetadata struct {
    Version      string
    MinVersion   string  // Devices below this can't upgrade
    URL          string  // CDN URL
    Size         int64
    SHA256       string
    Signature    string  // RSA signature from build server
    RolloutPhase string  // "canary", "staged", "general"
}

type OTAService struct {
    registry   DeviceRegistry
    storage    BlobStorage
    metrics    MetricsCollector
}

// Device polls for updates (pull model)
func (s *OTAService) CheckUpdate(ctx context.Context, deviceID string) (*FirmwareMetadata, error) {
    device, err := s.registry.GetDevice(ctx, deviceID)
    if err != nil {
        return nil, err
    }
    
    // Get latest firmware for device type
    latest, err := s.registry.GetLatestFirmware(device.Type, device.HardwareVersion)
    if err != nil {
        return nil, err
    }
    
    // Skip if device already up-to-date
    if device.FirmwareVersion >= latest.Version {
        return nil, nil
    }
    
    // Canary strategy: only 1% of devices get latest during canary
    if latest.RolloutPhase == "canary" && !s.isInCanaryGroup(deviceID, 0.01) {
        return nil, nil  // Skip this device for now
    }
    
    // Staged rollout: gradual percentage increase
    if latest.RolloutPhase == "staged" {
        percentage := s.getStagedPercentage(latest.CreatedAt)
        if !s.isInCanaryGroup(deviceID, percentage) {
            return nil, nil
        }
    }
    
    return latest, nil
}

// Report update status (for monitoring)
func (s *OTAService) ReportUpdateStatus(ctx context.Context, deviceID, version, status string) error {
    s.metrics.Increment("ota.status", map[string]string{
        "version": version,
        "status":  status,  // downloading, verifying, applying, success, failed
    })
    
    // If failure rate > 5%, auto-rollback
    failureRate := s.metrics.GetRate("ota.status", map[string]string{
        "version": version,
        "status":  "failed",
    })
    
    if failureRate > 0.05 {
        s.registry.PauseFirmwareRollout(version)
        s.alertOncall("High OTA failure rate for version %s", version)
    }
    
    return s.registry.UpdateDeviceFirmwareStatus(ctx, deviceID, version, status)
}

// Generate presigned URL for download (avoid hotlinking)
func (s *OTAService) GetDownloadURL(ctx context.Context, deviceID, version string) (string, error) {
    // Verify device is authorized to download this version
    fw, err := s.CheckUpdate(ctx, deviceID)
    if err != nil || fw == nil || fw.Version != version {
        return "", fmt.Errorf("unauthorized")
    }
    
    // Generate presigned URL (valid for 1 hour)
    return s.storage.PresignedURL(fw.URL, 3600)
}

Device-side implementation (C for constrained devices):

#include <stdio.h>
#include <mbedtls/sha256.h>
#include <esp_https_ota.h>

// Atomic update with rollback
typedef enum {
    PARTITION_A,
    PARTITION_B
} partition_t;

partition_t current_partition;
partition_t next_partition;

int perform_ota_update(const char* url, const char* expected_sha256) {
    esp_http_client_config_t config = {
        .url = url,
        .cert_pem = server_cert_pem_start,
        .timeout_ms = 10000,
    };
    
    esp_https_ota_config_t ota_config = {
        .http_config = &config,
    };
    
    esp_https_ota_handle_t ota_handle = NULL;
    
    // Step 1: Begin OTA (write to inactive partition)
    esp_err_t err = esp_https_ota_begin(&ota_config, &ota_handle);
    if (err != ESP_OK) {
        report_status("failed", "ota_begin_failed");
        return -1;
    }
    
    // Step 2: Download and verify in chunks
    report_status("downloading", NULL);
    while (1) {
        err = esp_https_ota_perform(ota_handle);
        if (err != ESP_ERR_HTTPS_OTA_IN_PROGRESS) {
            break;
        }
        // Print progress (optional for debugging via serial)
    }
    
    if (err != ESP_OK) {
        esp_https_ota_abort(ota_handle);
        report_status("failed", "download_failed");
        return -1;
    }
    
    // Step 3: Verify signature and hash
    esp_app_desc_t new_app_info;
    esp_https_ota_get_img_desc(ota_handle, &new_app_info);
    
    // Compare SHA256
    if (verify_sha256(ota_handle, expected_sha256) != 0) {
        esp_https_ota_abort(ota_handle);
        report_status("failed", "checksum_mismatch");
        return -1;
    }
    
    // Step 4: Finalize (mark partition as bootable)
    err = esp_https_ota_finish(ota_handle);
    if (err != ESP_OK) {
        report_status("failed", "ota_finish_failed");
        return -1;
    }
    
    report_status("success", NULL);
    
    // Step 5: Reboot to new partition
    // Bootloader will try new partition, fallback to old if boot fails
    esp_restart();
    
    return 0;
}

// Bootloader marks partition as valid after successful boot
void app_main() {
    // Verify this partition boots correctly
    const esp_partition_t *running = esp_ota_get_running_partition();
    esp_ota_img_states_t ota_state;
    
    esp_ota_get_state_partition(running, &ota_state);
    if (ota_state == ESP_OTA_IMG_PENDING_VERIFY) {
        // First boot after OTA, mark as valid
        esp_ota_mark_app_valid_cancel_rollback();
    }
    
    // Continue app initialization...
}

Key decisions:

  • Pull vs Push: Pull (device polls) scales better than push (server tracks all devices)
  • Delta updates: Reduce bandwidth (binary diff), but complex implementation
  • Dual-bank: Atomic updates with rollback, requires 2x flash size
  • Canary rollout: Critical for catching bugs early (1% β†’ 10% β†’ 100%)

MQTT Protocol Deep Dive

MQTT lΓ  giao thα»©c phα»• biαΊΏn nhαΊ₯t cho IoT vΓ¬ lightweight (2-byte header tα»‘i thiểu) vΓ  publish/subscribe model (decouple producers/consumers).

MQTT Architecture

                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚   MQTT Broker       β”‚
                    β”‚   (VerneMQ/EMQ X)   β”‚
                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                               β”‚
          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
          β”‚                    β”‚                    β”‚
    [Publisher A]         [Publisher B]      [Subscriber C]
     topic: sensors/       topic: sensors/    subscribe:
     temp/room1           humidity/room1      sensors/+/room1
          β”‚                    β”‚                    β”‚
          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         Shared bus

QoS Levels

QoS 0 (At most once)          QoS 1 (At least once)        QoS 2 (Exactly once)
────────────────────          ─────────────────────        ────────────────────
Publisher ──PUBLISH──▢ Broker  Publisher ──PUBLISH──▢ Broker Publisher ──PUBLISH──▢ Broker
          (no ack)                    ◀──PUBACK───         (packet ID)     ◀──PUBREC───
                                                            ──PUBREL──▢    
Fire and forget               May duplicate                ◀──PUBCOMP──
                                                            No duplicates

When to use:

  • QoS 0: Telemetry (temperature every 10s) - mα»™t sample bα»‹ mαΊ₯t khΓ΄ng critical
  • QoS 1: Commands (turn on light) - duplicate OK (idempotent)
  • QoS 2: Billing events (charge customer) - no duplicates allowed

Performance impact:

import paho.mqtt.client as mqtt
import time

def benchmark_qos():
    client = mqtt.Client()
    client.connect("localhost", 1883)
    
    payload = b"x" * 100  # 100 bytes
    count = 10000
    
    # QoS 0
    start = time.time()
    for i in range(count):
        client.publish("benchmark/qos0", payload, qos=0)
    qos0_time = time.time() - start
    print(f"QoS 0: {count/qos0_time:.0f} msg/sec")
    
    # QoS 1
    start = time.time()
    for i in range(count):
        info = client.publish("benchmark/qos1", payload, qos=1)
        info.wait_for_publish()  # Wait for PUBACK
    qos1_time = time.time() - start
    print(f"QoS 1: {count/qos1_time:.0f} msg/sec")
    
    # QoS 2
    start = time.time()
    for i in range(count):
        info = client.publish("benchmark/qos2", payload, qos=2)
        info.wait_for_publish()  # Wait for PUBCOMP
    qos2_time = time.time() - start
    print(f"QoS 2: {count/qos2_time:.0f} msg/sec")
    
    # Results (typical):
    # QoS 0: ~100,000 msg/sec
    # QoS 1: ~30,000 msg/sec
    # QoS 2: ~10,000 msg/sec

Retained Messages & Last Will

Retained messages: Broker stores last value, new subscribers get it immediately.

# Publisher: Report device status
client.publish("devices/sensor123/status", 
               payload='{"online": true, "battery": 85}',
               retain=True,  # Store this value
               qos=1)

# New subscriber connects 10 minutes later
client.subscribe("devices/+/status")
# Immediately receives retained status for all devices

Last Will Testament (LWT): Auto-publish message khi client disconnect bαΊ₯t ngờ.

client = mqtt.Client()

# Set LWT before connect
client.will_set(
    topic="devices/sensor123/status",
    payload='{"online": false}',
    qos=1,
    retain=True
)

client.connect("broker.example.com")
# If network drops or device crashes, broker auto-publishes LWT

Use case: Device heartbeat monitoring

type DeviceMonitor struct {
    redis *redis.Client
}

// MQTT subscriber for LWT messages
func (m *DeviceMonitor) OnMessage(topic string, payload []byte) {
    deviceID := extractDeviceID(topic)  // devices/sensor123/status
    
    var status struct {
        Online  bool `json:"online"`
        Battery int  `json:"battery"`
    }
    json.Unmarshal(payload, &status)
    
    if !status.Online {
        // Device went offline
        m.redis.Set(ctx, fmt.Sprintf("device:%s:online", deviceID), "0", 0)
        
        // Trigger alert if offline > 5 minutes
        m.scheduleOfflineAlert(deviceID, 5*time.Minute)
    } else {
        // Device back online
        m.redis.Set(ctx, fmt.Sprintf("device:%s:online", deviceID), "1", 0)
        m.redis.Set(ctx, fmt.Sprintf("device:%s:last_seen", deviceID), time.Now().Unix(), 0)
    }
}

MQTT Broker Clustering

Challenge: Scale MQTT broker to handle 1M concurrent connections.

VerneMQ clustering:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  VerneMQ 1  │────▢│  VerneMQ 2  │────▢│  VerneMQ 3  β”‚
β”‚ (100k conn) β”‚     β”‚ (100k conn) β”‚     β”‚ (100k conn) β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
       β”‚                   β”‚                   β”‚
       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
              Plumtree gossip (CRDT)

Config (vernemq.conf):

# Clustering
nodename = VerneMQ1@192.168.1.101
distributed_cookie = secret_cluster_cookie

# Join cluster
cluster.join = VerneMQ2@192.168.1.102

# Performance tuning
max_connections = 100000
max_inflight_messages = 100
max_message_size = 65536

# Persistent session in database (distributed)
metadata_plugin = vmq_swc
vmq_swc.db_backend = leveldb

Subscription routing: Khi device subscribe ở node A, nhưng message publish ở node B:

Device A ──subscribe("sensors/#")──▢ Node A
Device B ──publish("sensors/temp")─▢ Node B
                                        β”‚
                Node A ◀────gossipβ”€β”€β”€β”€β”€β”€β”˜
                   β”‚
                   └────deliver──▢ Device A

Trade-off:

  • Shared subscriptions ($share/group/topic): Load balancing across consumers
  • Routing overhead: Inter-node traffic (minimize with local_only topics)

Telemetry Ingestion

High-Throughput Message Pipeline

Scenario: 100,000 devices x 1 message/sec = 100k msg/sec sustained load.

Architecture:

MQTT Broker (cluster)
      β”‚
      β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Message Router  β”‚  (Kafka topic: iot.telemetry)
β”‚   Kafka/Kinesis β”‚  - Partitions: 50
β”‚                 β”‚  - Replication: 3
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜  - Retention: 7 days
         β”‚
         β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
         β–Ό                  β–Ό                  β–Ό                 β–Ό
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚ Stream β”‚        β”‚  Time-  β”‚       β”‚ Analyticsβ”‚     β”‚ Archival β”‚
    β”‚ Processβ”‚        β”‚ Series  β”‚       β”‚ Engine   β”‚     β”‚ (S3)     β”‚
    β”‚ (Flink)β”‚        β”‚ Ingest  β”‚       β”‚ (Druid)  β”‚     β”‚          β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

MQTT β†’ Kafka bridge:

package bridge

import (
    "encoding/json"
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "github.com/segmentio/kafka-go"
)

type MQTTKafkaBridge struct {
    mqttClient  mqtt.Client
    kafkaWriter *kafka.Writer
}

func NewBridge(mqttBroker, kafkaBroker string) *MQTTKafkaBridge {
    // Kafka writer with batching
    kafkaWriter := &kafka.Writer{
        Addr:         kafka.TCP(kafkaBroker),
        Topic:        "iot.telemetry",
        Balancer:     &kafka.Hash{},  // Partition by device ID
        BatchSize:    100,             // Batch 100 messages
        BatchTimeout: 10 * time.Millisecond,
        Compression:  kafka.Snappy,
        Async:        true,            // Don't block MQTT
    }
    
    // MQTT client
    opts := mqtt.NewClientOptions()
    opts.AddBroker(mqttBroker)
    opts.SetClientID("kafka-bridge")
    opts.SetCleanSession(false)
    opts.SetAutoReconnect(true)
    opts.SetMaxReconnectInterval(60 * time.Second)
    
    mqttClient := mqtt.NewClient(opts)
    
    bridge := &MQTTKafkaBridge{
        mqttClient:  mqttClient,
        kafkaWriter: kafkaWriter,
    }
    
    // Subscribe to all telemetry
    mqttClient.Subscribe("telemetry/#", 0, bridge.onMessage)
    
    return bridge
}

func (b *MQTTKafkaBridge) onMessage(client mqtt.Client, msg mqtt.Message) {
    // Extract device ID from topic (telemetry/{device_id})
    parts := strings.Split(msg.Topic(), "/")
    if len(parts) != 2 {
        return
    }
    deviceID := parts[1]
    
    // Enrich with metadata
    envelope := TelemetryEnvelope{
        DeviceID:  deviceID,
        Timestamp: time.Now().UnixNano(),
        Payload:   msg.Payload(),
        Topic:     msg.Topic(),
    }
    
    bytes, _ := json.Marshal(envelope)
    
    // Write to Kafka (async, batched)
    b.kafkaWriter.WriteMessages(context.Background(),
        kafka.Message{
            Key:   []byte(deviceID),  // Partition by device ID
            Value: bytes,
        },
    )
}

Backpressure Handling

Problem: Kafka cluster hiccup β†’ messages buffer β†’ OOM.

Solution: Circuit breaker + adaptive rate limiting.

type AdaptiveRateLimiter struct {
    successCount int64
    errorCount   int64
    rate         int64  // Current allowed rate (msg/sec)
    maxRate      int64
    lastAdjust   time.Time
}

func (r *AdaptiveRateLimiter) Allow() bool {
    currentRate := atomic.LoadInt64(&r.rate)
    
    // Token bucket algorithm
    if !r.bucket.Allow() {
        return false
    }
    
    // Adjust rate every 10 seconds
    if time.Since(r.lastAdjust) > 10*time.Second {
        r.adjustRate()
    }
    
    return true
}

func (r *AdaptiveRateLimiter) adjustRate() {
    success := atomic.LoadInt64(&r.successCount)
    errors := atomic.LoadInt64(&r.errorCount)
    total := success + errors
    
    if total == 0 {
        return
    }
    
    errorRate := float64(errors) / float64(total)
    currentRate := atomic.LoadInt64(&r.rate)
    
    var newRate int64
    if errorRate > 0.05 {
        // Error rate > 5%, reduce by 20%
        newRate = int64(float64(currentRate) * 0.8)
    } else if errorRate < 0.01 {
        // Error rate < 1%, increase by 10%
        newRate = int64(float64(currentRate) * 1.1)
        if newRate > r.maxRate {
            newRate = r.maxRate
        }
    } else {
        newRate = currentRate
    }
    
    atomic.StoreInt64(&r.rate, newRate)
    atomic.StoreInt64(&r.successCount, 0)
    atomic.StoreInt64(&r.errorCount, 0)
    r.lastAdjust = time.Now()
    
    log.Printf("Adjusted rate: %d β†’ %d msg/sec (error rate: %.2f%%)", 
               currentRate, newRate, errorRate*100)
}

// Integration with bridge
func (b *MQTTKafkaBridge) onMessage(client mqtt.Client, msg mqtt.Message) {
    if !b.rateLimiter.Allow() {
        // Drop message (or buffer to disk)
        metrics.Increment("telemetry.dropped")
        return
    }
    
    err := b.kafkaWriter.WriteMessages(...)
    if err != nil {
        b.rateLimiter.RecordError()
    } else {
        b.rateLimiter.RecordSuccess()
    }
}

Protocol Comparison

Feature MQTT HTTP CoAP AMQP
Transport TCP TCP UDP TCP
Overhead 2 bytes ~200 bytes 4 bytes ~8 bytes
QoS 0/1/2 None (app-level) 0/1 (CON) 0/1/2/3
Pub/Sub Native Webhooks Observe Native
NAT/Firewall Good (MQTT-SN for UDP) Good Poor (UDP) Good
Battery Excellent Poor (HTTP/1.1) Excellent Good
Use Case General IoT Web-friendly Constrained devices Enterprise

Code example - CoAP client:

from coapthon.client.helperclient import HelperClient

# CoAP uses UDP, very lightweight
client = HelperClient(server=('coap.example.com', 5683))

# Publish telemetry (CON = confirmable, like QoS 1)
response = client.post('sensors/temp', 
                        payload='{"value": 23.5}',
                        timeout=5)
print(response.pretty_print())

# Observe (like MQTT subscribe)
client.observe('actuators/fan', callback=lambda r: print(r.payload))

Time-Series Data Storage

Storage Engine Comparison

InfluxDB (popular choice):

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚        InfluxDB Architecture         β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  WAL (Write-Ahead Log)               β”‚  ← Durability
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  Cache (in-memory writes)            β”‚  ← Fast writes
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  TSM Files (Time-Structured Merge)   β”‚  ← Columnar storage
β”‚    - data/                           β”‚
β”‚      - 00001.tsm (1h block)          β”‚
β”‚      - 00002.tsm (1h block)          β”‚
β”‚    - index/                          β”‚
β”‚      - tag index (inverted)          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Schema design:

-- Measurement = table name
-- Tags = indexed metadata (low cardinality)
-- Fields = actual values (not indexed)
-- Timestamp = automatic

-- Example: Store temperature readings
-- DON'T: use device_id as field (can't filter efficiently)
-- DO: use device_id as tag

INSERT temperature,device_id=sensor123,room=bedroom value=23.5

-- Query by tag (fast, uses index)
SELECT value FROM temperature 
WHERE device_id='sensor123' 
  AND time > now() - 1h

-- Aggregate
SELECT MEAN(value) FROM temperature
WHERE room='bedroom'
  AND time > now() - 24h
GROUP BY time(10m), device_id

Cardinality explosion (common pitfall):

# BAD: High cardinality tag
# 1M devices Γ— 1000 rooms = 1B tag combinations β†’ slow
influx.write_points([{
    "measurement": "temperature",
    "tags": {
        "device_id": "sensor12345",  # 1M unique values
        "room": "bedroom_floor2_apt501"  # 1000 unique values
    },
    "fields": {"value": 23.5}
}])

# GOOD: Keep tags low cardinality, move high-cardinality to fields
influx.write_points([{
    "measurement": "temperature",
    "tags": {
        "building_id": "building_42",  # 100 buildings
        "floor": "2"                    # 20 floors
    },
    "fields": {
        "device_id": "sensor12345",  # Field, not indexed
        "value": 23.5
    }
}])

Write optimization (batching):

package ingestion

import (
    influxdb2 "github.com/influxdata/influxdb-client-go/v2"
    "github.com/influxdata/influxdb-client-go/v2/api/write"
)

type TelemetryWriter struct {
    client   influxdb2.Client
    writeAPI api.WriteAPI
    buffer   []*write.Point
    mu       sync.Mutex
}

func NewTelemetryWriter(url, token, org, bucket string) *TelemetryWriter {
    client := influxdb2.NewClient(url, token)
    writeAPI := client.WriteAPI(org, bucket)
    
    // Handle async errors
    errorsCh := writeAPI.Errors()
    go func() {
        for err := range errorsCh {
            log.Printf("InfluxDB write error: %v", err)
        }
    }()
    
    return &TelemetryWriter{
        client:   client,
        writeAPI: writeAPI,
        buffer:   make([]*write.Point, 0, 1000),
    }
}

func (w *TelemetryWriter) WriteTelemetry(deviceID string, measurement string, value float64) {
    p := influxdb2.NewPointWithMeasurement(measurement).
        AddTag("device_id", deviceID).
        AddField("value", value).
        SetTime(time.Now())
    
    // Async write (batched automatically by client)
    w.writeAPI.WritePoint(p)
}

// For very high throughput, manual batching gives more control
func (w *TelemetryWriter) WriteBatch(points []*write.Point) error {
    // Write in chunks of 5000 (InfluxDB recommended batch size)
    const batchSize = 5000
    
    for i := 0; i < len(points); i += batchSize {
        end := i + batchSize
        if end > len(points) {
            end = len(points)
        }
        
        batch := points[i:end]
        w.writeAPI.WritePoint(batch...)
    }
    
    // Flush to ensure all writes complete
    w.writeAPI.Flush()
    
    return nil
}

Retention & Downsampling

Problem: Raw data (1 sample/sec) = 86,400 points/day/device. 1M devices = 86B points/day β†’ $$

Solution: Continuous queries for downsampling + retention policies.

-- Create retention policies
CREATE RETENTION POLICY "raw" ON "iot_db" DURATION 7d REPLICATION 1 DEFAULT
CREATE RETENTION POLICY "downsampled_1h" ON "iot_db" DURATION 90d REPLICATION 1
CREATE RETENTION POLICY "downsampled_1d" ON "iot_db" DURATION 5y REPLICATION 1

-- Continuous query: auto-downsample raw data to 1h averages
CREATE CONTINUOUS QUERY "cq_downsample_1h" ON "iot_db"
BEGIN
  SELECT MEAN(value) AS value
  INTO "downsampled_1h"."temperature"
  FROM "raw"."temperature"
  GROUP BY time(1h), device_id
END

-- Continuous query: 1d averages
CREATE CONTINUOUS QUERY "cq_downsample_1d" ON "iot_db"
BEGIN
  SELECT MEAN(value) AS value
  INTO "downsampled_1d"."temperature"
  FROM "downsampled_1h"."temperature"
  GROUP BY time(1d), device_id
END

Storage savings:

Raw (7 days):          1M devices Γ— 86,400 points/day Γ— 7d = 604B points
1h downsampled (90d):  1M devices Γ— 24 points/day Γ— 90d = 2.16B points
1d downsampled (5y):   1M devices Γ— 1 point/day Γ— 1825d = 1.83B points
──────────────────────────────────────────────────────────────────────
Total: 608B points (vs 31T points if storing raw for 5 years)

TimescaleDB Alternative

Postgres extension (better for relational queries):

-- Create hypertable (auto-partitioning by time)
CREATE TABLE telemetry (
    time        TIMESTAMPTZ NOT NULL,
    device_id   TEXT NOT NULL,
    metric      TEXT NOT NULL,
    value       DOUBLE PRECISION
);

SELECT create_hypertable('telemetry', 'time');

-- Create index on device_id (for device-specific queries)
CREATE INDEX ON telemetry (device_id, time DESC);

-- Compression (10x reduction typical)
ALTER TABLE telemetry SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'device_id',
    timescaledb.compress_orderby = 'time DESC'
);

-- Auto-compress chunks older than 7 days
SELECT add_compression_policy('telemetry', INTERVAL '7 days');

-- Continuous aggregate (like InfluxDB CQ)
CREATE MATERIALIZED VIEW telemetry_1h
WITH (timescaledb.continuous) AS
SELECT device_id,
       time_bucket('1 hour', time) AS bucket,
       AVG(value) as avg_value,
       MAX(value) as max_value,
       MIN(value) as min_value
FROM telemetry
GROUP BY device_id, bucket;

-- Refresh policy
SELECT add_continuous_aggregate_policy('telemetry_1h',
    start_offset => INTERVAL '3 hours',
    end_offset => INTERVAL '1 hour',
    schedule_interval => INTERVAL '1 hour');

Query performance:

-- Query raw data (last hour)
EXPLAIN ANALYZE
SELECT time, value 
FROM telemetry 
WHERE device_id = 'sensor123' 
  AND time > NOW() - INTERVAL '1 hour';

-- Planning Time: 0.5ms
-- Execution Time: 12ms (3600 points)

-- Query aggregated data (last month)
EXPLAIN ANALYZE
SELECT bucket, avg_value
FROM telemetry_1h
WHERE device_id = 'sensor123'
  AND bucket > NOW() - INTERVAL '30 days';

-- Planning Time: 0.3ms
-- Execution Time: 3ms (720 points, pre-aggregated)

Edge Computing

Edge Gateway Architecture

Why edge: Reduce bandwidth, lower latency, work offline.

                  Cloud
                    β–²
                    β”‚ (aggregated data)
         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
         β”‚                     β”‚
    Edge Gateway          Edge Gateway
    (Raspberry Pi)        (Industrial PC)
         β”‚                     β”‚
    β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”           β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”
    β”‚         β”‚           β”‚         β”‚
  [Sensor] [Sensor]   [PLC]      [Camera]
  (BLE)    (Zigbee)   (Modbus)   (RTSP)

Edge processing pipeline:

# Running on edge gateway (e.g., Raspberry Pi 4)
import asyncio
from azure.iot.device.aio import IoTHubModuleClient
from collections import deque
import numpy as np

class EdgeTelemetryProcessor:
    def __init__(self):
        self.client = IoTHubModuleClient.create_from_edge_environment()
        self.buffer = deque(maxlen=1000)  # Local buffer
        self.anomaly_detector = AnomalyDetector()
    
    async def process_sensor_reading(self, device_id, value):
        # Step 1: Local buffering (survive network outage)
        self.buffer.append({
            'device_id': device_id,
            'value': value,
            'timestamp': time.time()
        })
        
        # Step 2: Local anomaly detection
        is_anomaly = self.anomaly_detector.detect(device_id, value)
        
        if is_anomaly:
            # Critical: send to cloud immediately
            await self.send_to_cloud({
                'device_id': device_id,
                'value': value,
                'alert': 'anomaly_detected',
                'priority': 'high'
            })
        else:
            # Normal: aggregate and batch
            # Only send summary every 5 minutes
            pass
    
    async def flush_aggregates(self):
        """Send aggregated data to cloud every 5 minutes"""
        while True:
            await asyncio.sleep(300)  # 5 minutes
            
            # Aggregate by device
            aggregates = {}
            for reading in list(self.buffer):
                device_id = reading['device_id']
                if device_id not in aggregates:
                    aggregates[device_id] = []
                aggregates[device_id].append(reading['value'])
            
            # Send summary
            for device_id, values in aggregates.items():
                summary = {
                    'device_id': device_id,
                    'count': len(values),
                    'mean': np.mean(values),
                    'min': np.min(values),
                    'max': np.max(values),
                    'std': np.std(values)
                }
                await self.send_to_cloud(summary)
            
            self.buffer.clear()
    
    async def send_to_cloud(self, data):
        try:
            msg = Message(json.dumps(data))
            await self.client.send_message_to_output(msg, "telemetryOutput")
        except Exception as e:
            # Network error, keep in buffer
            log.error(f"Failed to send to cloud: {e}")

Bandwidth savings:

Without edge aggregation:
100 sensors Γ— 1 sample/sec Γ— 60 sec Γ— 100 bytes = 600 KB/min β†’ 864 MB/day

With edge aggregation:
100 sensors Γ— 1 summary/5min Γ— 200 bytes = 4 KB/5min β†’ 1.15 MB/day

Savings: 99.87% bandwidth reduction

Edge-Cloud Sync

Azure IoT Edge (deployment.json):

{
  "modulesContent": {
    "$edgeAgent": {
      "properties.desired": {
        "modules": {
          "telemetryProcessor": {
            "version": "1.0",
            "type": "docker",
            "status": "running",
            "restartPolicy": "always",
            "settings": {
              "image": "acr.io/telemetry-processor:latest",
              "createOptions": {
                "HostConfig": {
                  "Binds": ["/data:/data"]
                }
              }
            }
          },
          "localDatabase": {
            "version": "1.0",
            "type": "docker",
            "status": "running",
            "settings": {
              "image": "timescale/timescaledb:latest"
            }
          }
        }
      }
    },
    "$edgeHub": {
      "properties.desired": {
        "routes": {
          "sensorToProcessor": "FROM /messages/modules/sensorModule/* INTO BrokeredEndpoint(\"/modules/telemetryProcessor/inputs/sensorInput\")",
          "processorToCloud": "FROM /messages/modules/telemetryProcessor/outputs/telemetryOutput INTO $upstream"
        },
        "storeAndForwardConfiguration": {
          "timeToLiveSecs": 604800
        }
      }
    }
  }
}

Offline-first design:

package edge

import (
    "database/sql"
    _ "github.com/mattn/go-sqlite3"
)

type OfflineBuffer struct {
    db *sql.DB
}

func NewOfflineBuffer() *OfflineBuffer {
    db, _ := sql.Open("sqlite3", "/data/offline_buffer.db")
    
    // Create table for buffered messages
    db.Exec(`CREATE TABLE IF NOT EXISTS messages (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        timestamp INTEGER,
        payload BLOB,
        retry_count INTEGER DEFAULT 0
    )`)
    
    return &OfflineBuffer{db: db}
}

func (b *OfflineBuffer) Store(payload []byte) error {
    _, err := b.db.Exec(
        "INSERT INTO messages (timestamp, payload) VALUES (?, ?)",
        time.Now().Unix(), payload,
    )
    return err
}

func (b *OfflineBuffer) Flush(cloudClient CloudClient) error {
    rows, err := b.db.Query(
        "SELECT id, payload FROM messages ORDER BY timestamp LIMIT 100",
    )
    if err != nil {
        return err
    }
    defer rows.Close()
    
    var successIDs []int64
    for rows.Next() {
        var id int64
        var payload []byte
        rows.Scan(&id, &payload)
        
        err := cloudClient.Send(payload)
        if err == nil {
            successIDs = append(successIDs, id)
        } else {
            // Increment retry count
            b.db.Exec("UPDATE messages SET retry_count = retry_count + 1 WHERE id = ?", id)
        }
    }
    
    // Delete successfully sent messages
    if len(successIDs) > 0 {
        placeholders := strings.Repeat("?,", len(successIDs)-1) + "?"
        query := fmt.Sprintf("DELETE FROM messages WHERE id IN (%s)", placeholders)
        
        args := make([]interface{}, len(successIDs))
        for i, id := range successIDs {
            args[i] = id
        }
        
        b.db.Exec(query, args...)
    }
    
    return nil
}

Device Twins / Shadows

Concept: Server-side representation of device state, accessible even when device offline.

Device (actual state)          Cloud (desired state)
─────────────────────          ─────────────────────
{                              {
  "firmware": "1.2.0",           "firmware": "1.3.0",  ← Cloud wants upgrade
  "config": {                    "config": {
    "interval": 60                 "interval": 30      ← Cloud wants faster reports
  }                              }
}                              }

Device syncs:
1. Receives desired state from cloud
2. Applies changes (upgrade firmware, update config)
3. Reports new actual state

AWS IoT Device Shadow

Shadow document structure:

{
  "state": {
    "desired": {
      "temperature_threshold": 25,
      "reporting_interval": 30,
      "led_brightness": 80
    },
    "reported": {
      "temperature_threshold": 20,
      "reporting_interval": 60,
      "led_brightness": 50,
      "firmware_version": "1.2.0",
      "battery": 75
    }
  },
  "metadata": {
    "desired": {
      "temperature_threshold": {
        "timestamp": 1678901234
      }
    },
    "reported": {
      "battery": {
        "timestamp": 1678901200
      }
    }
  },
  "version": 42,
  "timestamp": 1678901234
}

Device-side implementation (Python):

from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTShadowClient

class DeviceController:
    def __init__(self, thing_name):
        self.shadow_client = AWSIoTMQTTShadowClient("device123")
        self.shadow_client.configureEndpoint("abc123.iot.us-west-2.amazonaws.com", 8883)
        self.shadow_client.configureCredentials(
            "root-ca.pem", "private.key", "certificate.pem"
        )
        self.shadow_client.connect()
        
        self.device_shadow = self.shadow_client.createShadowHandlerWithName(thing_name, True)
        
        # Listen for desired state changes
        self.device_shadow.shadowRegisterDeltaCallback(self.on_delta)
        
        # Initial sync
        self.device_shadow.shadowGet(self.on_shadow, 5)
    
    def on_delta(self, payload, responseStatus, token):
        """Called when cloud updates desired state"""
        delta = json.loads(payload)
        print(f"Received delta: {delta}")
        
        # Apply changes
        if "temperature_threshold" in delta["state"]:
            new_threshold = delta["state"]["temperature_threshold"]
            self.update_config("temp_threshold", new_threshold)
        
        if "reporting_interval" in delta["state"]:
            new_interval = delta["state"]["reporting_interval"]
            self.update_config("interval", new_interval)
        
        # Report back new state
        self.report_state({
            "temperature_threshold": new_threshold,
            "reporting_interval": new_interval
        })
    
    def report_state(self, state):
        """Report current device state to cloud"""
        payload = {
            "state": {
                "reported": state
            }
        }
        self.device_shadow.shadowUpdate(json.dumps(payload), self.on_shadow_update, 5)
    
    def on_shadow_update(self, payload, responseStatus, token):
        if responseStatus == "accepted":
            print("Shadow updated successfully")
        else:
            print(f"Shadow update failed: {responseStatus}")

Cloud-side: Update desired state (Go):

package shadow

import (
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/iotdataplane"
)

type ShadowService struct {
    iot *iotdataplane.IoTDataPlane
}

func (s *ShadowService) UpdateDesiredState(thingName string, desired map[string]interface{}) error {
    payload := map[string]interface{}{
        "state": map[string]interface{}{
            "desired": desired,
        },
    }
    
    jsonPayload, _ := json.Marshal(payload)
    
    _, err := s.iot.UpdateThingShadow(&iotdataplane.UpdateThingShadowInput{
        ThingName: aws.String(thingName),
        Payload:   jsonPayload,
    })
    
    return err
}

// Example: Remotely configure all devices in a building
func (s *ShadowService) ConfigureBuilding(buildingID string, config map[string]interface{}) error {
    // Get all devices in building
    devices, err := s.registry.GetDevicesByBuilding(buildingID)
    if err != nil {
        return err
    }
    
    // Update shadows in parallel
    var wg sync.WaitGroup
    errCh := make(chan error, len(devices))
    
    for _, device := range devices {
        wg.Add(1)
        go func(deviceID string) {
            defer wg.Done()
            err := s.UpdateDesiredState(deviceID, config)
            if err != nil {
                errCh <- err
            }
        }(device.ID)
    }
    
    wg.Wait()
    close(errCh)
    
    // Check for errors
    var errors []error
    for err := range errCh {
        errors = append(errors, err)
    }
    
    if len(errors) > 0 {
        return fmt.Errorf("failed to update %d devices", len(errors))
    }
    
    return nil
}

Conflict Resolution

Problem: Device offline for 1 hour, during which cloud updates desired state 10 times. Device comes back online.

Strategy 1: Last-write-wins (LWW)

def resolve_conflict(reported, desired, metadata):
    # Compare timestamps
    if metadata["reported"]["timestamp"] > metadata["desired"]["timestamp"]:
        # Device state is newer, keep reported
        return reported
    else:
        # Cloud state is newer, apply desired
        return desired

Strategy 2: Merge (for non-conflicting keys)

def merge_states(reported, desired):
    merged = reported.copy()
    
    for key, value in desired.items():
        if key not in reported:
            # New key from cloud, apply it
            merged[key] = value
        elif reported[key] != value:
            # Conflict, cloud wins
            merged[key] = value
    
    return merged

Strategy 3: Vector clocks (advanced)

type VectorClock map[string]int

func (vc VectorClock) HappensBefore(other VectorClock) bool {
    lessOrEqual := true
    strictlyLess := false
    
    for node, timestamp := range vc {
        otherTimestamp := other[node]
        if timestamp > otherTimestamp {
            return false  // Not happened before
        }
        if timestamp < otherTimestamp {
            strictlyLess = true
        }
    }
    
    return lessOrEqual && strictlyLess
}

// Usage in shadow
type DeviceShadow struct {
    State       map[string]interface{}
    VectorClock VectorClock
}

func (s *DeviceShadow) Merge(other *DeviceShadow) {
    if s.VectorClock.HappensBefore(other.VectorClock) {
        // Other is newer, replace
        s.State = other.State
        s.VectorClock = other.VectorClock
    } else if other.VectorClock.HappensBefore(s.VectorClock) {
        // Self is newer, keep current
        return
    } else {
        // Concurrent updates, manual merge required
        s.State = mergeConflict(s.State, other.State)
        s.VectorClock = mergeVectorClocks(s.VectorClock, other.VectorClock)
    }
}

Security

Device Authentication

Certificate-based (X.509) - industry standard:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Root CA     β”‚  (offline, stored in HSM)
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
       β”‚
β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”
β”‚ Intermediate β”‚  (signs device certs)
β”‚   CA         β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
       β”‚
       β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
       β–Ό          β–Ό          β–Ό          β–Ό
   [Device A] [Device B] [Device C] [Device D]
   (unique    (unique    (unique    (unique
    cert)      cert)      cert)      cert)

Certificate generation (offline, during manufacturing):

#!/bin/bash
# Generate device certificate

DEVICE_ID=$1

# Generate private key (stays on device, never transmitted)
openssl genrsa -out ${DEVICE_ID}.key 2048

# Generate CSR (Certificate Signing Request)
openssl req -new -key ${DEVICE_ID}.key -out ${DEVICE_ID}.csr \
    -subj "/CN=${DEVICE_ID}/O=MyCompany/C=US"

# Sign with intermediate CA
openssl x509 -req -in ${DEVICE_ID}.csr \
    -CA intermediate-ca.crt -CAkey intermediate-ca.key \
    -CAcreateserial -out ${DEVICE_ID}.crt \
    -days 3650 -sha256

# Embed cert + root CA in device firmware
cat ${DEVICE_ID}.crt intermediate-ca.crt root-ca.crt > ${DEVICE_ID}-chain.pem

Device-side TLS connection (C):

#include <mbedtls/ssl.h>
#include <mbedtls/net_sockets.h>
#include <mbedtls/entropy.h>
#include <mbedtls/ctr_drbg.h>

int connect_to_broker() {
    mbedtls_ssl_context ssl;
    mbedtls_ssl_config conf;
    mbedtls_x509_crt cacert, clicert;
    mbedtls_pk_context pkey;
    
    // Load device certificate and private key
    mbedtls_x509_crt_parse_file(&clicert, "/certs/device.crt");
    mbedtls_pk_parse_keyfile(&pkey, "/certs/device.key", NULL);
    
    // Load root CA
    mbedtls_x509_crt_parse_file(&cacert, "/certs/root-ca.crt");
    
    // Configure SSL
    mbedtls_ssl_config_defaults(&conf,
        MBEDTLS_SSL_IS_CLIENT,
        MBEDTLS_SSL_TRANSPORT_STREAM,
        MBEDTLS_SSL_PRESET_DEFAULT);
    
    mbedtls_ssl_conf_ca_chain(&conf, &cacert, NULL);
    mbedtls_ssl_conf_own_cert(&conf, &clicert, &pkey);
    
    // Verify server certificate
    mbedtls_ssl_conf_authmode(&conf, MBEDTLS_SSL_VERIFY_REQUIRED);
    
    // Connect
    mbedtls_net_context server_fd;
    mbedtls_net_connect(&server_fd, "mqtt.example.com", "8883", MBEDTLS_NET_PROTO_TCP);
    
    mbedtls_ssl_setup(&ssl, &conf);
    mbedtls_ssl_set_bio(&ssl, &server_fd, mbedtls_net_send, mbedtls_net_recv, NULL);
    
    // TLS handshake
    int ret = mbedtls_ssl_handshake(&ssl);
    if (ret != 0) {
        printf("TLS handshake failed: -0x%x\n", -ret);
        return -1;
    }
    
    // Verify peer certificate
    uint32_t flags = mbedtls_ssl_get_verify_result(&ssl);
    if (flags != 0) {
        printf("Certificate verification failed\n");
        return -1;
    }
    
    printf("TLS connection established\n");
    return 0;
}

Secure Boot

Purpose: Prevent malicious firmware from running.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚         Secure Boot Chain              β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ 1. ROM Bootloader (immutable)          β”‚
β”‚    - Verify Stage 2 bootloader sig     β”‚
β”‚    - Public key burned in OTP fuses    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ 2. Stage 2 Bootloader                  β”‚
β”‚    - Verify app firmware signature     β”‚
β”‚    - Check rollback protection         β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ 3. Application Firmware                β”‚
β”‚    - Run only if signature valid       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

ESP32 secure boot (example):

// Bootloader verification (in Stage 2)
#include "esp_secure_boot.h"

esp_err_t verify_and_boot() {
    const esp_partition_t *partition = esp_partition_find_first(
        ESP_PARTITION_TYPE_APP, ESP_PARTITION_SUBTYPE_ANY, NULL);
    
    // Read app image
    esp_image_metadata_t metadata;
    esp_err_t err = esp_image_verify(ESP_IMAGE_VERIFY, 
                                      partition->address, 
                                      &metadata);
    
    if (err != ESP_OK) {
        // Signature verification failed
        ESP_LOGE("boot", "Invalid firmware signature");
        
        // Try backup partition
        partition = esp_partition_find_first(
            ESP_PARTITION_TYPE_APP, ESP_PARTITION_SUBTYPE_APP_OTA_1, NULL);
        
        err = esp_image_verify(ESP_IMAGE_VERIFY, 
                                partition->address, 
                                &metadata);
        
        if (err != ESP_OK) {
            // Both partitions invalid, halt
            esp_restart();
        }
    }
    
    // Check rollback version (prevent downgrade attacks)
    if (metadata.secure_version < get_secure_version()) {
        ESP_LOGE("boot", "Firmware version too old (rollback protection)");
        esp_restart();
    }
    
    // Boot into verified app
    esp_image_load(partition->address);
    
    return ESP_OK;
}

Encryption at Rest

Problem: Device stolen, attacker extracts flash memory.

Solution: Flash encryption (hardware-accelerated):

// Enable flash encryption (ESP32)
esp_err_t enable_flash_encryption() {
    // Generate 256-bit AES key in eFuse (one-time programmable)
    esp_flash_encryption_init();
    
    // Encrypt all partitions
    const esp_partition_t *partition = esp_partition_find_first(
        ESP_PARTITION_TYPE_APP, ESP_PARTITION_SUBTYPE_ANY, NULL);
    
    esp_flash_encrypt_region(partition->address, partition->size);
    
    // Lock eFuse (key cannot be read out)
    esp_efuse_write_key(EFUSE_BLK_KEY0, ESP_EFUSE_KEY_PURPOSE_FLASH_ENCRYPTION);
    
    return ESP_OK;
}

// Data is automatically decrypted on read, encrypted on write
// Attacker reading raw flash gets only encrypted data

Cloud-side: Encryption at rest (AWS IoT):

// Store device credentials encrypted
import (
    "github.com/aws/aws-sdk-go/service/kms"
)

func storeDeviceSecret(deviceID, secret string) error {
    // Encrypt with KMS
    kmsClient := kms.New(session.Must(session.NewSession()))
    
    encrypted, err := kmsClient.Encrypt(&kms.EncryptInput{
        KeyId:     aws.String("arn:aws:kms:us-west-2:123456789:key/abcd-1234"),
        Plaintext: []byte(secret),
        EncryptionContext: map[string]*string{
            "device_id": aws.String(deviceID),
        },
    })
    
    if err != nil {
        return err
    }
    
    // Store encrypted blob in DynamoDB
    return dynamoDB.PutItem(&dynamodb.PutItemInput{
        TableName: aws.String("device_secrets"),
        Item: map[string]*dynamodb.AttributeValue{
            "device_id": {S: aws.String(deviceID)},
            "encrypted_secret": {B: encrypted.CiphertextBlob},
        },
    })
}

Analytics & Anomaly Detection

Stream Processing for Real-time Alerts

Apache Flink pipeline:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Read from Kafka
FlinkKafkaConsumer<TelemetryEvent> consumer = new FlinkKafkaConsumer<>(
    "iot.telemetry",
    new TelemetrySchema(),
    properties
);

DataStream<TelemetryEvent> telemetry = env.addSource(consumer);

// Detect temperature anomalies
DataStream<Alert> alerts = telemetry
    .keyBy(event -> event.getDeviceId())
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .process(new AnomalyDetector());

// Send alerts
alerts.addSink(new AlertSink());

env.execute("IoT Anomaly Detection");

Anomaly detector (statistical approach):

public class AnomalyDetector extends ProcessWindowFunction<TelemetryEvent, Alert, String, TimeWindow> {
    
    @Override
    public void process(String deviceId, Context context, Iterable<TelemetryEvent> events, Collector<Alert> out) {
        List<Double> values = new ArrayList<>();
        for (TelemetryEvent event : events) {
            values.add(event.getValue());
        }
        
        // Calculate statistics
        double mean = calculateMean(values);
        double stdDev = calculateStdDev(values, mean);
        
        // Check for outliers (Z-score > 3)
        for (TelemetryEvent event : events) {
            double zScore = Math.abs((event.getValue() - mean) / stdDev);
            
            if (zScore > 3.0) {
                out.collect(new Alert(
                    deviceId,
                    event.getTimestamp(),
                    event.getValue(),
                    "Statistical anomaly detected (Z-score: " + zScore + ")"
                ));
            }
        }
    }
}

Machine Learning for Predictive Maintenance

Use case: Dα»± Δ‘oΓ‘n motor failure dα»±a trΓͺn vibration patterns.

Training pipeline (Python + TensorFlow):

import tensorflow as tf
from tensorflow import keras
import pandas as pd
import numpy as np

# Load historical data (labeled: normal vs failure)
df = pd.read_csv('motor_vibration_data.csv')
# Columns: device_id, timestamp, vibration_x, vibration_y, vibration_z, temperature, label

# Feature engineering
def extract_features(window):
    """Extract features from 1-minute window"""
    return {
        'mean_vibration': np.mean(window[['vibration_x', 'vibration_y', 'vibration_z']].values),
        'std_vibration': np.std(window[['vibration_x', 'vibration_y', 'vibration_z']].values),
        'max_vibration': np.max(window[['vibration_x', 'vibration_y', 'vibration_z']].values),
        'fft_peak': get_fft_peak_frequency(window['vibration_x'].values),
        'temperature': np.mean(window['temperature'].values),
    }

# Build LSTM model
model = keras.Sequential([
    keras.layers.LSTM(64, input_shape=(60, 5), return_sequences=True),  # 60 timesteps
    keras.layers.Dropout(0.2),
    keras.layers.LSTM(32),
    keras.layers.Dropout(0.2),
    keras.layers.Dense(16, activation='relu'),
    keras.layers.Dense(1, activation='sigmoid')  # Failure probability
])

model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['accuracy', keras.metrics.AUC()])

# Train
history = model.fit(X_train, y_train, 
                     epochs=50, 
                     batch_size=32,
                     validation_data=(X_val, y_val))

# Export model
model.save('motor_failure_predictor.h5')

Inference at edge (TensorFlow Lite for microcontrollers):

# Convert to TFLite
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()

# Deploy to edge device (C++)
# ...

Or inference in cloud (real-time):

from kafka import KafkaConsumer
import tensorflow as tf

model = tf.keras.models.load_model('motor_failure_predictor.h5')
consumer = KafkaConsumer('iot.telemetry', bootstrap_servers='localhost:9092')

for message in consumer:
    telemetry = json.loads(message.value)
    
    # Buffering: wait for 60 samples (1 minute @ 1Hz)
    buffer.append(telemetry)
    if len(buffer) < 60:
        continue
    
    # Extract features
    features = extract_features(buffer)
    X = np.array([features]).reshape(1, 60, 5)
    
    # Predict
    failure_probability = model.predict(X)[0][0]
    
    if failure_probability > 0.8:
        # High risk, send alert
        send_alert(telemetry['device_id'], f"Predicted failure risk: {failure_probability:.2%}")
    
    buffer = buffer[1:]  # Sliding window

Scalability & Cost Optimization

Sharding Strategies

Problem: 10M devices, single MQTT broker can't handle.

Solution: Shard by device ID.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚            Load Balancer (HAProxy)                  β”‚
β”‚  - Read device_id from MQTT CONNECT packet         β”‚
β”‚  - Hash(device_id) % num_shards β†’ shard            β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
             β”‚              β”‚             β”‚
             β–Ό              β–Ό             β–Ό
      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
      β”‚ Shard 1  β”‚   β”‚ Shard 2  β”‚  β”‚ Shard 3  β”‚
      β”‚ (VerneMQ)β”‚   β”‚ (VerneMQ)β”‚  β”‚ (VerneMQ)β”‚
      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
      device_id      device_id     device_id
      % 3 == 0       % 3 == 1      % 3 == 2

HAProxy config:

global
    log /dev/log local0
    maxconn 1000000

frontend mqtt_front
    bind *:1883
    mode tcp
    option tcplog
    
    # Extract device_id from MQTT CONNECT
    tcp-request inspect-delay 5s
    tcp-request content accept if { req.len gt 0 }
    
    # Hash-based backend selection
    use_backend shard_%[req.payload(0,0),mqtt_field_value(client_id),mod(3)]

backend shard_0
    mode tcp
    balance leastconn
    server mqtt1 10.0.1.10:1883 check

backend shard_1
    mode tcp
    balance leastconn
    server mqtt2 10.0.1.11:1883 check

backend shard_2
    mode tcp
    balance leastconn
    server mqtt3 10.0.1.12:1883 check

Resharding (inevitable as you scale):

# Gradual resharding to minimize disruption
class ReshardController:
    def __init__(self, old_shards, new_shards):
        self.old_shards = old_shards
        self.new_shards = new_shards
        self.migration_percentage = 0
    
    def get_shard(self, device_id):
        # During migration, probabilistically route to new shards
        if random.random() < self.migration_percentage:
            return hash(device_id) % self.new_shards
        else:
            return hash(device_id) % self.old_shards
    
    def increase_migration(self, percentage):
        """Gradually increase traffic to new shards"""
        self.migration_percentage = min(1.0, percentage)
        
        # Update load balancer config
        self.update_haproxy_config()
    
    # Migration plan:
    # Day 1: 0% β†’ new shards (deploy new shards)
    # Day 2: 10% β†’ new shards
    # Day 3: 30% β†’ new shards
    # Day 4: 50% β†’ new shards
    # Day 5: 100% β†’ new shards (decommission old shards)

Cold Storage & Data Lifecycle

Tiered storage:

Hot Tier (0-7 days):     InfluxDB (SSD)        - Full-resolution
Warm Tier (7-90 days):   S3 + Athena           - 1h aggregates  
Cold Tier (90d-5y):      S3 Glacier            - 1d aggregates
Archive (>5y):           Delete or tape backup - Regulatory only

Auto-archival job:

package archival

import (
    "github.com/aws/aws-sdk-go/service/s3"
    influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)

type ArchivalJob struct {
    influx influxdb2.Client
    s3     *s3.S3
}

func (j *ArchivalJob) ArchiveOldData() error {
    // Query data older than 7 days
    query := `
        from(bucket: "iot_telemetry")
        |> range(start: -14d, stop: -7d)
        |> aggregateWindow(every: 1h, fn: mean)
    `
    
    result, err := j.influx.QueryAPI("myorg").Query(context.Background(), query)
    if err != nil {
        return err
    }
    
    // Write to Parquet file
    parquetFile := createParquetWriter("telemetry_archive.parquet")
    
    for result.Next() {
        record := result.Record()
        parquetFile.Write(record)
    }
    
    parquetFile.Close()
    
    // Upload to S3
    file, _ := os.Open("telemetry_archive.parquet")
    _, err = j.s3.PutObject(&s3.PutObjectInput{
        Bucket: aws.String("iot-archive"),
        Key:    aws.String(fmt.Sprintf("year=%d/month=%d/telemetry.parquet", 
                                       time.Now().Year(), time.Now().Month())),
        Body:   file,
        StorageClass: aws.String("GLACIER"),  // Cheap long-term storage
    })
    
    // Delete from InfluxDB
    if err == nil {
        deleteQuery := `
            from(bucket: "iot_telemetry")
            |> range(start: -14d, stop: -7d)
            |> delete()
        `
        j.influx.QueryAPI("myorg").Query(context.Background(), deleteQuery)
    }
    
    return err
}

Query archived data (Athena):

-- Create external table on S3
CREATE EXTERNAL TABLE telemetry_archive (
    device_id STRING,
    timestamp BIGINT,
    value DOUBLE
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET
LOCATION 's3://iot-archive/';

-- Query (billed by data scanned, cheap if partitioned well)
SELECT device_id, AVG(value) as avg_value
FROM telemetry_archive
WHERE year = 2025 AND month = 3 
  AND device_id = 'sensor123'
GROUP BY device_id;

Cost Optimization Techniques

1. Protocol efficiency:

HTTP POST:      ~500 bytes overhead (headers)
MQTT Publish:   ~2 bytes overhead (minimal header)

1M devices Γ— 1 msg/min Γ— 500 bytes Γ— 60 min Γ— 24h Γ— 30d = 21.6 TB/month (HTTP)
1M devices Γ— 1 msg/min Γ— 2 bytes Γ— 60 min Γ— 24h Γ— 30d = 86 GB/month (MQTT)

Data transfer cost (AWS): $0.09/GB
HTTP: $1,944/month
MQTT: $7.74/month
Savings: 99.6%

2. Compression:

import zlib
import json

# Raw telemetry
data = {
    "device_id": "sensor123",
    "timestamp": 1678901234,
    "temperature": 23.5,
    "humidity": 65.2,
    "pressure": 1013.25
}
raw = json.dumps(data).encode()
print(f"Raw: {len(raw)} bytes")  # ~120 bytes

# Compressed
compressed = zlib.compress(raw)
print(f"Compressed: {len(compressed)} bytes")  # ~80 bytes
print(f"Ratio: {len(compressed)/len(raw):.1%}")  # 66.7%

# For time-series, binary formats even better
import struct
binary = struct.pack('>Qfff', 
                     data['timestamp'],
                     data['temperature'],
                     data['humidity'],
                     data['pressure'])
print(f"Binary: {len(binary)} bytes")  # 20 bytes β†’ 83% reduction

3. Batching:

# Instead of sending 60 individual messages (1/min for 1 hour)
# Send 1 batch message every hour

# Individual messages
for i in range(60):
    client.publish(topic, json.dumps({"value": values[i]}))
# Cost: 60 Γ— $0.000001/msg = $0.00006

# Batched
batch = {"values": values, "interval": 60}
client.publish(topic, json.dumps(batch))
# Cost: 1 Γ— $0.000001/msg = $0.000001
# Savings: 98.3%

4. Sampling:

# Adaptive sampling: send more data when changing, less when stable
class AdaptiveSampler:
    def __init__(self, threshold=0.5):
        self.last_value = None
        self.threshold = threshold
    
    def should_send(self, value):
        if self.last_value is None:
            self.last_value = value
            return True
        
        # Send if change > threshold
        if abs(value - self.last_value) > self.threshold:
            self.last_value = value
            return True
        
        return False

# Example: Temperature stable at 23Β°C for 1 hour
sampler = AdaptiveSampler(threshold=0.5)
for temp in [23.0, 23.1, 23.0, 23.2, ..., 23.1]:  # 60 readings
    if sampler.should_send(temp):
        client.publish("temp", str(temp))

# Result: Only 2-3 messages sent (when temp changes > 0.5Β°C)
# Instead of 60 messages
# Savings: 95%

Interview Questions

Q1: Design an IoT platform for 10 million smart thermostats

Requirements:

  • 10M devices, each sends temperature every 60 seconds
  • Users can view current temp and historical graphs
  • Users can set target temperature remotely
  • 99.9% uptime

Calculation:

Traffic:
- Telemetry: 10M devices Γ— 1 msg/60s = 166k msg/sec
- Commands: ~1k/sec (users adjusting temp)
- Queries: ~10k/sec (users viewing dashboards)

Data:
- Per message: 50 bytes (device_id, timestamp, temp)
- Per day: 166k msg/sec Γ— 50 bytes Γ— 86400 sec = 718 GB/day
- Per year: 262 TB/year

Architecture:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                      Devices (10M)                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                      β”‚ MQTT/TLS
                      β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  MQTT Broker Cluster (VerneMQ, 100 nodes, sharded)        β”‚
β”‚  - 100k connections/node                                   β”‚
β”‚  - QoS 0 for telemetry (fire-and-forget)                  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                      β”‚
                      β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Kafka (50 partitions, 3x replication)                     β”‚
β”‚  - Topic: telemetry (retention: 7 days)                    β”‚
β”‚  - Topic: commands (retention: 30 days)                    β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
       β”‚                      β”‚
       β–Ό                      β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Stream     β”‚      β”‚   Device Shadow Serviceβ”‚
β”‚  Processor  β”‚      β”‚   (Redis, device state)β”‚
β”‚  (Flink)    β”‚      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚  - Anomaly  β”‚                  β”‚
β”‚  - Alerts   β”‚                  β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                  β”‚
       β”‚                         β”‚
       β–Ό                         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  InfluxDB      β”‚      β”‚  PostgreSQL     β”‚
β”‚  (time-series) β”‚      β”‚  (metadata,     β”‚
β”‚  - Raw: 7d     β”‚      β”‚   users, auth)  β”‚
β”‚  - 1h: 90d     β”‚      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚  - 1d: 5y      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  API Gateway (GraphQL/REST)                                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                      β”‚
                      β–Ό
                  [Web/Mobile App]

Key decisions:

  1. MQTT QoS 0: Temperature readings, one lost sample khΓ΄ng critical
  2. Sharding: 100 MQTT brokers Γ— 100k connections = 10M
  3. Kafka: Decouple ingestion tα»« processing, replay-able
  4. Redis for shadows: Fast reads (current temp), TTL for offline devices
  5. InfluxDB downsampling: Raw data expensive, downsample sau 7 days

Bottlenecks & scaling:

  • MQTT broker: Horizontal scaling vα»›i sharding
  • Kafka: Increase partitions (50 β†’ 100)
  • InfluxDB: Clustering (Enterprise) hoαΊ·c vΓ o TimescaleDB
  • API: Stateless, scale horizontally

Q2: How do you handle firmware OTA for 1M devices without bricking them?

Strategy:

1. Canary rollout (1% β†’ 10% β†’ 100%)
2. Health checks (post-update boot verification)
3. Automatic rollback (bootloader dual-partition)
4. Gradual rollout with kill switch

Implementation:

class OTAController:
    def rollout_firmware(self, version, target_devices):
        # Phase 1: Canary (1%, 10,000 devices)
        canary_devices = sample(target_devices, 10000)
        self.update_devices(canary_devices, version)
        
        # Monitor for 24 hours
        time.sleep(86400)
        
        # Check failure rate
        failures = self.get_failure_count(version)
        failure_rate = failures / 10000
        
        if failure_rate > 0.05:  # 5% failure threshold
            # Abort rollout
            self.pause_rollout(version)
            self.alert_oncall(f"High failure rate: {failure_rate:.1%}")
            return
        
        # Phase 2: Staged (10%, 100k devices)
        staged_devices = sample(target_devices, 100000)
        self.update_devices(staged_devices, version)
        
        time.sleep(43200)  # Monitor for 12 hours
        
        failures = self.get_failure_count(version)
        failure_rate = failures / 100000
        
        if failure_rate > 0.02:
            self.pause_rollout(version)
            return
        
        # Phase 3: General availability (remaining 890k devices)
        # Gradual rollout over 7 days to avoid thundering herd
        remaining = list(set(target_devices) - set(staged_devices))
        
        for day in range(7):
            batch = remaining[day * 127142 : (day+1) * 127142]
            self.update_devices(batch, version)
            time.sleep(86400)

Device-side health check:

void app_main() {
    // Check if this is first boot after OTA
    esp_ota_img_states_t state;
    const esp_partition_t *running = esp_ota_get_running_partition();
    esp_ota_get_state_partition(running, &state);
    
    if (state == ESP_OTA_IMG_PENDING_VERIFY) {
        // Perform health checks
        bool wifi_ok = test_wifi_connection();
        bool mqtt_ok = test_mqtt_connection();
        bool sensors_ok = test_all_sensors();
        
        if (wifi_ok && mqtt_ok && sensors_ok) {
            // Mark firmware as valid
            esp_ota_mark_app_valid_cancel_rollback();
            report_ota_success();
        } else {
            // Health check failed, reboot to rollback
            ESP_LOGE("OTA", "Health check failed, rolling back");
            esp_restart();  // Bootloader will boot previous partition
        }
    }
    
    // Normal operation...
}

Q3: Optimize cost: 1M devices sending 1 msg/min costs $50k/month. Reduce to $5k.

Current cost breakdown:

Assumptions:
- 1M devices Γ— 60 msg/hour Γ— 24h = 1.44B msg/day
- MQTT broker: AWS IoT Core = $1 per 1M messages
- Data transfer: $0.09/GB
- Storage: InfluxDB Cloud = $0.50/GB-month

Costs:
- Messages: 1.44B msg/day Γ— 30d Γ— $1/1M = $43,200/month
- Data transfer: 1.44B msg Γ— 100 bytes Γ— $0.09/GB = $12,960/month  
- Storage: 4.32 TB Γ— $0.50 = $2,160/month
Total: $58,320/month

Optimization 1: Edge aggregation (reduce messages by 60x)

# Instead of 60 individual messages/hour, send 1 aggregate
class EdgeAggregator:
    def __init__(self):
        self.buffer = []
    
    def add_reading(self, value):
        self.buffer.append(value)
        
        if len(self.buffer) >= 60:  # 1 hour of data
            self.send_aggregate()
    
    def send_aggregate(self):
        msg = {
            "count": len(self.buffer),
            "mean": np.mean(self.buffer),
            "min": np.min(self.buffer),
            "max": np.max(self.buffer)
        }
        client.publish("telemetry", json.dumps(msg))
        self.buffer = []

# New cost:
# Messages: 1.44B / 60 = 24M msg/day Γ— 30d Γ— $1/1M = $720/month
# Savings: $42,480/month

Optimization 2: Compression (reduce data transfer by 70%)

import zlib

compressed_msg = zlib.compress(json.dumps(msg).encode())

# New cost:
# Data transfer: $12,960 Γ— 0.3 = $3,888/month
# Savings: $9,072/month

Optimization 3: Self-hosted MQTT broker (eliminate per-message cost)

Deploy VerneMQ on EC2:
- 10Γ— c5.2xlarge instances ($0.34/hour Γ— 10 Γ— 730h = $2,482/month)
- Handle 1M concurrent connections
- Zero per-message cost

Savings: $43,200 - $2,482 = $40,718/month

Optimization 4: Downsampling & cold storage

-- Keep raw data for 7 days only
CREATE RETENTION POLICY "raw" ON "iot_db" DURATION 7d REPLICATION 1 DEFAULT

-- 1h aggregates for 90 days (60x size reduction)
-- 1d aggregates for 5 years (1440x size reduction)

-- New storage:
Raw (7d): 4.32 TB / 30 Γ— 7 = 1.01 TB
1h (90d): 1.01 TB / 60 Γ— 90/7 = 0.22 TB
1d (5y): 0.22 TB / 24 Γ— 1825/90 = 0.19 TB
Total: 1.42 TB

Cost: 1.42 TB Γ— $0.50 = $710/month
Savings: $1,450/month

Total optimized cost:

Messages: $720/month (edge aggregation)
Data transfer: $3,888/month (compression)
Broker: $2,482/month (self-hosted)
Storage: $710/month (downsampling)
────────────────────────────────────
Total: $7,800/month

Original: $58,320/month
Optimized: $7,800/month
Savings: 86.6% ($50,520/month)

TΓ³m tαΊ―t

Điểm quan trọng cαΊ§n nhα»›:

  1. Scale = Sharding: KhΓ΄ng thể scale vertically, phαΊ£i shard MQTT brokers, databases
  2. Constrained devices: Battery, bandwidth, memory β†’ optimize protocols (MQTT > HTTP), compression, edge processing
  3. Reliability: QoS levels, device shadows, offline buffering, dual-partition OTA
  4. Security: Certificate-based auth, TLS, secure boot, flash encryption
  5. Cost: Edge aggregation, compression, batching, downsampling, cold storage
  6. Time-series DB: InfluxDB/TimescaleDB specialized cho IoT workloads
  7. Real-time: Stream processing (Flink/Kafka) cho alerts vΓ  analytics
  8. Edge computing: Reduce bandwidth (99%), lower latency, offline-capable

Common pitfalls:

  • Cardinality explosion: Sα»­ dα»₯ng high-cardinality values nhΖ° tags trong InfluxDB
  • Thundering herd: TαΊ₯t cαΊ£ devices reconnect cΓΉng lΓΊc sau network outage
  • No rollback: OTA updates khΓ΄ng cΓ³ fallback β†’ brick devices
  • Stateful brokers: MαΊ₯t connection state khi broker restart
  • Ignoring offline: Devices sαΊ½ offline, phαΊ£i cΓ³ offline buffering

Trade-offs to discuss in interviews:

Decision Option A Option B When to use A When to use B
Protocol MQTT HTTP Constrained devices, constant connection Web-friendly, firewall traversal
QoS 0 (at most once) 1 (at least once) Telemetry (lossy OK) Commands (must arrive)
Storage InfluxDB TimescaleDB Pure time-series Need relational joins
Processing Edge Cloud Low latency, bandwidth limited Complex analytics, ML
Sharding By device ID By geography Even distribution Data locality

TΓ i liệu tham khαΊ£o

Papers & specs:

Books:

  • "Designing Data-Intensive Applications" - Martin Kleppmann (Chapter 11: Stream Processing)
  • "Building the Internet of Things" - Maciej Kranz
  • "IoT Inc" - Bruce Sinclair

Open-source projects:

Blogs:

Video courses:

Communities: