🏭 Domains✍️ Khoa📅 19/04/2026☕ 46 phút đọc

Domain: Healthcare Systems — HIPAA, EHR, HL7/FHIR

Healthcare IT là một trong những domain phức tạp nhất trong software engineering: không chỉ phải đảm bảo tính mạng (life-critical systems), mà còn phải tuân thủ hàng loạt quy định pháp lý nghiêm ngặt về bảo mật dữ liệu. Một lỗi ở đây không chỉ làm crash một trang web — nó có thể gây nguy hiểm tính mạng hoặc vi phạm HIPAA với mức phạt lên đến $50,000/vi phạm và có thể lên đến $1.5M/năm.

Section này mô tả cách các hệ thống healthcare như Epic, Cerner, Allscripts, hoặc các HIS (Hospital Information System) tại Việt Nam thực sự hoạt động, cùng với những thách thức về interoperability, compliance, và security mà senior engineers phải đối mặt.


1. Healthcare IT Landscape — Bối cảnh và Kiến trúc Tổng thể

1.1 Các thành phần chính trong Healthcare System

┌─────────────────────────────────────────────────────────────────────┐
│                         Patient Portal                              │
│                   (Web/Mobile, Patient-facing)                      │
└──────────────────────────────┬──────────────────────────────────────┘
                               │ HTTPS/FHIR
                               ▼
┌──────────────────────────────────────────────────────────────────────┐
│                       API Gateway / HIE                              │
│              (Health Information Exchange/Interoperability)          │
└───────┬──────────────┬─────────────┬────────────────┬───────────────┘
        │              │             │                │
        ▼              ▼             ▼                ▼
   ┌────────┐    ┌─────────┐   ┌────────┐      ┌──────────┐
   │  EHR   │    │  PACS   │   │  LIS   │      │   RIS    │
   │(Epic,  │    │(Imaging)│   │ (Lab)  │      │(Radiology│
   │Cerner) │    │         │   │        │      │  Info)   │
   └────┬───┘    └────┬────┘   └───┬────┘      └────┬─────┘
        │             │            │                 │
        └─────────────┴────────────┴─────────────────┘
                               │
                               ▼
                    ┌────────────────────┐
                    │  Integration Engine│
                    │  (Mirth, Rhapsody) │
                    │   HL7 v2/FHIR      │
                    └────────────────────┘

Key Components:

  • EHR (Electronic Health Record): Hệ thống lưu trữ toàn bộ hồ sơ bệnh án điện tử
  • PACS (Picture Archiving and Communication System): Lưu trữ và quản lý hình ảnh y khoa (X-ray, CT, MRI)
  • LIS (Laboratory Information System): Quản lý kết quả xét nghiệm
  • RIS (Radiology Information System): Quản lý lịch hẹn và báo cáo chẩn đoán hình ảnh
  • HIE (Health Information Exchange): Trao đổi dữ liệu giữa các tổ chức y tế
  • Integration Engine: Trung gian chuyển đổi và định tuyến message (HL7/FHIR)

1.2 Tại sao Healthcare IT khó?

Regulatory Complexity:

┌──────────────────────────────────────────────────────────────┐
│ HIPAA (US) — Privacy + Security + Breach Notification        │
│ HITECH Act — EHR incentives + increased penalties            │
│ GDPR (EU) — Right to erasure (conflicts with EHR retention!) │
│ FDA regulations — Medical device software (SaMD)             │
│ State laws — California CMIA, Texas HB 300                   │
└──────────────────────────────────────────────────────────────┘

Technical Challenges:

  • Legacy Systems: Hầu hết bệnh viện vẫn dùng HL7 v2 (chuẩn năm 1987!)
  • Vendor Lock-in: Epic, Cerner chiếm 50%+ thị trường US, proprietary APIs
  • Data Silos: Mỗi department dùng hệ thống riêng, integration nightmare
  • High Availability: 99.99% uptime requirement (4 phút downtime/tháng)
  • Latency Sensitivity: Clinical decision support cần response < 200ms

Engineering Trade-offs:

Security      ←→  Usability
  (nhiều auth steps)  ←→  (doctors cần access nhanh trong emergency)

Compliance    ←→  Innovation Speed
  (audit mọi thay đổi)  ←→  (deploy nhanh hotfix)

De-identification  ←→  Clinical Utility
  (remove PII cho research)  ←→  (contextual info needed)

2. HIPAA Compliance — Tuân thủ Privacy và Security

2.1 HIPAA là gì và tại sao engineer phải biết?

HIPAA (Health Insurance Portability and Accountability Act) có 3 rules chính:

  1. Privacy Rule: Quy định ai được truy cập PHI (Protected Health Information)
  2. Security Rule: Yêu cầu kỹ thuật để bảo vệ ePHI (electronic PHI)
  3. Breach Notification Rule: Notification trong vòng 60 ngày nếu data breach

PHI (Protected Health Information) bao gồm 18 identifiers:

1. Names                         10. Vehicle identifiers (VIN, plate)
2. Geographic subdivisions       11. Device identifiers/serial numbers
3. Dates (birth, death, admit)   12. URLs
4. Phone numbers                 13. IP addresses
5. Fax numbers                   14. Biometric identifiers (fingerprint)
6. Email addresses               15. Photos
7. SSN                           16. Any other unique identifying number
8. MRN (Medical Record Number)   17. Code or characteristic
9. Health plan beneficiary no.   18. Certificate/license numbers

Quan trọng: Ngay cả IP address của bệnh nhân cũng là PHI theo HIPAA!

2.2 Security Rule — Technical Requirements

HIPAA Security Rule có 3 categories với tổng cộng 18 standards:

A. Administrative Safeguards

✓ Security Management Process
  - Risk Analysis (annual assessment)
  - Risk Management (mitigation plan)
  - Sanction Policy (employee violations)
  - Information System Activity Review (audit logs)

✓ Workforce Security
  - Authorization/Supervision
  - Workforce Clearance (background check)
  - Termination Procedures (revoke access immediately)

✓ Access Management
  - Isolate healthcare clearinghouse functions
  - Access Authorization (role-based access)
  - Access Establishment/Modification

B. Physical Safeguards

✓ Facility Access Controls
  - Badge systems, biometric access
  - Visitor logs, escort procedures
  - Secure disposal (shred PHI, wipe drives)

✓ Workstation Security
  - Auto-lock after 5 min idle
  - Privacy screens in public areas
  - No unauthorized USB devices

✓ Device and Media Controls
  - Encrypt all portable devices (laptops, USB)
  - Serial number tracking
  - Secure decommissioning (NIST 800-88 wipe standards)

C. Technical Safeguards (engineer's focus!)

✓ Access Control
  - Unique user identification (no shared accounts!)
  - Emergency access procedure (break-glass)
  - Auto logoff (session timeout)
  - Encryption and decryption

✓ Audit Controls
  - Log access to PHI (who, what, when, where)
  - Detect unauthorized access attempts
  - Retain logs 6 years minimum

✓ Integrity Controls
  - Detect unauthorized alterations
  - Digital signatures for critical data

✓ Transmission Security
  - TLS 1.3 for data in transit
  - No unencrypted email of PHI
  - VPN for remote access

2.3 Encryption Requirements — Technical Implementation

At Rest Encryption:

from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
import os

class PHIEncryption:
    """
    HIPAA-compliant encryption for PHI at rest.
    Uses AES-256-GCM (FIPS 140-2 validated algorithm).
    """
    
    def __init__(self, master_key: bytes):
        """
        master_key should come from KMS (AWS KMS, Azure Key Vault, HSM)
        and be rotated every 90 days per security policy.
        """
        self.master_key = master_key
    
    def encrypt_phi(self, plaintext: str, patient_id: str) -> dict:
        """
        Encrypt PHI with authenticated encryption.
        Returns: dict with ciphertext, nonce, tag, and key_version.
        """
        # Generate unique nonce (96-bit for GCM)
        nonce = os.urandom(12)
        
        # Use patient_id as additional authenticated data (AAD)
        # This binds ciphertext to specific patient
        aad = patient_id.encode('utf-8')
        
        # Encrypt with AES-256-GCM
        aesgcm = AESGCM(self.master_key)
        ciphertext = aesgcm.encrypt(
            nonce, 
            plaintext.encode('utf-8'), 
            aad
        )
        
        return {
            'ciphertext': ciphertext.hex(),
            'nonce': nonce.hex(),
            'aad': patient_id,
            'key_version': 'v1',  # for key rotation tracking
            'algorithm': 'AES-256-GCM'
        }
    
    def decrypt_phi(self, encrypted_data: dict) -> str:
        """
        Decrypt PHI and verify authenticity.
        Raises: InvalidTag if ciphertext was tampered with.
        """
        nonce = bytes.fromhex(encrypted_data['nonce'])
        ciphertext = bytes.fromhex(encrypted_data['ciphertext'])
        aad = encrypted_data['aad'].encode('utf-8')
        
        aesgcm = AESGCM(self.master_key)
        plaintext = aesgcm.decrypt(nonce, ciphertext, aad)
        
        return plaintext.decode('utf-8')


# Database schema for encrypted PHI
"""
CREATE TABLE patient_records (
    patient_id          UUID PRIMARY KEY,
    encrypted_ssn       BYTEA NOT NULL,  -- encrypted at application layer
    encrypted_diagnosis BYTEA NOT NULL,
    nonce               BYTEA NOT NULL,
    key_version         VARCHAR(10) NOT NULL,
    created_at          TIMESTAMPTZ NOT NULL,
    
    -- Non-PHI fields can be plaintext for indexing
    facility_id         UUID NOT NULL,
    record_type         VARCHAR(50) NOT NULL,
    
    -- Audit metadata
    created_by          UUID NOT NULL,
    last_accessed_at    TIMESTAMPTZ,
    last_accessed_by    UUID
);

-- Transparent Data Encryption (TDE) at database level
-- Encrypts entire database file with separate key
ALTER DATABASE healthcare_db SET encryption = ON;
"""

In Transit Encryption:

# NGINX config for HIPAA-compliant TLS
server {
    listen 443 ssl http2;
    server_name api.hospital.com;
    
    # TLS 1.3 only (TLS 1.2 acceptable, but 1.3 preferred)
    ssl_protocols TLSv1.3 TLSv1.2;
    
    # FIPS 140-2 compliant cipher suites
    ssl_ciphers 'TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256:ECDHE-RSA-AES256-GCM-SHA384';
    ssl_prefer_server_ciphers on;
    
    # HSTS — force HTTPS for 2 years
    add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload" always;
    
    # Certificate pinning (prevent MITM)
    add_header Public-Key-Pins 'pin-sha256="base64+primary=="; pin-sha256="base64+backup=="; max-age=5184000';
    
    # Security headers
    add_header X-Frame-Options "DENY" always;
    add_header X-Content-Type-Options "nosniff" always;
    add_header Content-Security-Policy "default-src 'self'" always;
    
    # Disable TLS session tickets (forward secrecy)
    ssl_session_tickets off;
    
    location /api/ {
        proxy_pass http://backend:8080;
        
        # Log PHI access for audit
        access_log /var/log/nginx/phi-access.log combined;
    }
}

2.4 Access Control — Role-Based Access Control (RBAC)

-- RBAC schema for healthcare system
CREATE TABLE roles (
    role_id     UUID PRIMARY KEY,
    role_name   VARCHAR(100) NOT NULL UNIQUE,
    description TEXT,
    
    -- HIPAA: Document minimum necessary principle
    data_access_scope VARCHAR(50) NOT NULL,  -- 'OWN_PATIENTS', 'DEPARTMENT', 'FACILITY', 'ALL'
    
    CONSTRAINT chk_scope CHECK (data_access_scope IN (
        'OWN_PATIENTS',    -- Primary care provider
        'DEPARTMENT',      -- ED doctors see all ED patients
        'FACILITY',        -- Hospital administrator
        'ALL'              -- System admin (requires justification)
    ))
);

-- Pre-defined roles
INSERT INTO roles VALUES
    ('...', 'PHYSICIAN', 'Licensed physician', 'OWN_PATIENTS'),
    ('...', 'NURSE', 'Registered nurse', 'OWN_PATIENTS'),
    ('...', 'ED_PHYSICIAN', 'Emergency dept physician', 'DEPARTMENT'),
    ('...', 'RADIOLOGIST', 'Radiologist', 'DEPARTMENT'),
    ('...', 'LAB_TECH', 'Lab technician', 'DEPARTMENT'),
    ('...', 'BILLING_STAFF', 'Billing department', 'FACILITY'),
    ('...', 'RESEARCHER', 'Research staff (de-identified data only)', 'NONE'),
    ('...', 'SYSTEM_ADMIN', 'IT administrator', 'ALL');

CREATE TABLE permissions (
    permission_id   UUID PRIMARY KEY,
    resource_type   VARCHAR(100) NOT NULL,  -- 'PATIENT_RECORD', 'LAB_RESULT', etc.
    action          VARCHAR(50) NOT NULL,   -- 'READ', 'WRITE', 'DELETE'
    conditions      JSONB,                  -- context-based access control
    
    CONSTRAINT chk_action CHECK (action IN ('READ', 'WRITE', 'DELETE', 'EXPORT'))
);

CREATE TABLE role_permissions (
    role_id         UUID REFERENCES roles(role_id),
    permission_id   UUID REFERENCES permissions(permission_id),
    granted_at      TIMESTAMPTZ DEFAULT now(),
    granted_by      UUID NOT NULL,  -- who granted this permission
    justification   TEXT NOT NULL,  -- HIPAA: document why access needed
    
    PRIMARY KEY (role_id, permission_id)
);

CREATE TABLE user_roles (
    user_id         UUID NOT NULL,
    role_id         UUID REFERENCES roles(role_id),
    assigned_at     TIMESTAMPTZ DEFAULT now(),
    assigned_by     UUID NOT NULL,
    expires_at      TIMESTAMPTZ,  -- temporary access (e.g., locum doctors)
    department_id   UUID,         -- scope limitation
    
    PRIMARY KEY (user_id, role_id)
);

Emergency Access (Break-Glass):

from typing import Optional
from datetime import datetime, timedelta
import logging

class BreakGlassAccess:
    """
    Emergency access mechanism for life-threatening situations.
    
    HIPAA allows emergency access to PHI, but requires:
    1. Detailed audit logging
    2. Post-access review
    3. Justification documentation
    """
    
    def grant_emergency_access(
        self,
        user_id: str,
        patient_id: str,
        justification: str,
        requesting_officer: str
    ) -> str:
        """
        Grant temporary elevated access in emergency.
        Returns: access_token valid for 1 hour
        """
        
        # Log the break-glass event IMMEDIATELY
        audit_id = self.log_break_glass_event(
            user_id=user_id,
            patient_id=patient_id,
            justification=justification,
            requesting_officer=requesting_officer
        )
        
        # Alert security team in real-time
        self.send_alert_to_security_team(audit_id)
        
        # Grant temporary access
        access_token = self.create_temporary_token(
            user_id=user_id,
            patient_id=patient_id,
            expires_in=timedelta(hours=1),
            access_level='EMERGENCY_FULL'
        )
        
        # Schedule automatic review
        self.schedule_post_access_review(
            audit_id=audit_id,
            review_deadline=datetime.now() + timedelta(hours=24)
        )
        
        logging.critical(
            f"BREAK-GLASS ACCESS GRANTED: "
            f"user={user_id} patient={patient_id} "
            f"officer={requesting_officer} audit_id={audit_id}"
        )
        
        return access_token
    
    def post_access_review(self, audit_id: str, reviewer_id: str, verdict: str):
        """
        Mandatory review within 24h after break-glass access.
        
        Verdict options:
        - JUSTIFIED: Emergency was legitimate
        - UNJUSTIFIED: Inappropriate access → HR referral
        - REQUIRES_INVESTIGATION: Escalate to compliance
        """
        if verdict == 'UNJUSTIFIED':
            self.trigger_hr_investigation(audit_id)
            self.send_breach_notification_if_needed(audit_id)

2.5 Business Associate Agreement (BAA)

Khi làm việc với third-party services (AWS, Google Cloud, Twilio, etc.), bạn PHẢI có BAA (Business Associate Agreement) trước khi xử lý PHI.

Common mistake:

# ❌ HIPAA VIOLATION — No BAA with OpenAI
import openai

def get_diagnosis_suggestion(symptoms: str) -> str:
    # Sending PHI to third-party without BAA
    response = openai.ChatCompletion.create(
        model="gpt-4",
        messages=[{"role": "user", "content": f"Patient symptoms: {symptoms}"}]
    )
    return response.choices[0].message.content


# ✅ CORRECT — Use BAA-covered service or de-identify first
def get_diagnosis_suggestion_hipaa_compliant(symptoms: str) -> str:
    # De-identify before sending to third-party
    de_identified = remove_phi_from_text(symptoms)
    
    # Or use HIPAA-compliant AI service with signed BAA
    response = compliant_ai_service.analyze(de_identified)
    return response

Services with BAA available:

  • AWS (S3, RDS, Lambda, etc.) — sign BAA in AWS Artifact
  • Google Cloud (GCP) — sign BAA in admin console
  • Azure — sign BAA in compliance manager
  • Twilio — Enterprise plan with HIPAA compliance
  • ❌ OpenAI — No BAA available (as of 2026)
  • ❌ Most analytics tools (Google Analytics, Mixpanel) — No BAA

3. EHR Architecture — Electronic Health Records

3.1 Data Model — Clinical Data Representation

EHR không chỉ là "database lưu patient info" — nó là một complex domain model với temporal data, versioning, và rich relationships.

-- Core patient demographics
CREATE TABLE patients (
    patient_id          UUID PRIMARY KEY,
    mrn                 VARCHAR(20) UNIQUE NOT NULL,  -- Medical Record Number
    encrypted_ssn       BYTEA,
    encrypted_first_name BYTEA NOT NULL,
    encrypted_last_name  BYTEA NOT NULL,
    date_of_birth       DATE NOT NULL,
    gender              VARCHAR(20),
    encrypted_address   BYTEA,
    encrypted_phone     BYTEA,
    
    -- Clinical identifiers
    blood_type          VARCHAR(5),  -- 'A+', 'O-', etc.
    organ_donor         BOOLEAN,
    
    -- Metadata
    created_at          TIMESTAMPTZ NOT NULL,
    updated_at          TIMESTAMPTZ NOT NULL,
    is_deceased         BOOLEAN DEFAULT FALSE,
    deceased_date       DATE,
    
    -- Audit trail
    created_by          UUID NOT NULL,
    facility_id         UUID NOT NULL
);

-- Encounters — Each hospital visit or appointment
CREATE TABLE encounters (
    encounter_id        UUID PRIMARY KEY,
    patient_id          UUID NOT NULL REFERENCES patients(patient_id),
    encounter_type      VARCHAR(50) NOT NULL,  -- 'INPATIENT', 'OUTPATIENT', 'EMERGENCY'
    admission_time      TIMESTAMPTZ NOT NULL,
    discharge_time      TIMESTAMPTZ,
    chief_complaint     TEXT,  -- Why patient came in
    
    -- Care team
    attending_physician UUID NOT NULL,
    admitting_physician UUID,
    consulting_physicians UUID[],
    primary_nurse       UUID,
    
    -- Location tracking
    department_id       UUID NOT NULL,
    room_number         VARCHAR(20),
    bed_number          VARCHAR(20),
    
    -- Clinical status
    acuity_level        INTEGER,  -- 1-5, ESI (Emergency Severity Index)
    isolation_required  BOOLEAN DEFAULT FALSE,
    isolation_type      VARCHAR(50),  -- 'CONTACT', 'DROPLET', 'AIRBORNE'
    
    -- Disposition
    discharge_disposition VARCHAR(50),  -- 'HOME', 'TRANSFERRED', 'ADMITTED', 'DECEASED'
    
    CONSTRAINT chk_encounter_type CHECK (encounter_type IN (
        'INPATIENT', 'OUTPATIENT', 'EMERGENCY', 'OBSERVATION', 'TELEMEDICINE'
    ))
);

-- Clinical observations (vitals, measurements)
CREATE TABLE observations (
    observation_id      UUID PRIMARY KEY,
    encounter_id        UUID NOT NULL REFERENCES encounters(encounter_id),
    patient_id          UUID NOT NULL REFERENCES patients(patient_id),
    
    -- LOINC code (Logical Observation Identifiers Names and Codes)
    loinc_code          VARCHAR(20) NOT NULL,  -- e.g., '8480-6' for Systolic BP
    observation_type    VARCHAR(100) NOT NULL,
    
    -- Value (polymorphic — can be numeric, text, or coded)
    value_numeric       DECIMAL(10,2),
    value_text          TEXT,
    value_code          VARCHAR(50),
    value_unit          VARCHAR(20),  -- 'mmHg', 'mg/dL', 'bpm'
    
    -- Reference ranges
    reference_low       DECIMAL(10,2),
    reference_high      DECIMAL(10,2),
    is_abnormal         BOOLEAN,
    
    -- Time and performer
    observed_at         TIMESTAMPTZ NOT NULL,
    observed_by         UUID NOT NULL,
    
    -- Status
    status              VARCHAR(20) NOT NULL,  -- 'FINAL', 'PRELIMINARY', 'CORRECTED'
    
    CONSTRAINT chk_status CHECK (status IN ('PRELIMINARY', 'FINAL', 'AMENDED', 'CANCELLED'))
);

-- Example LOINC codes:
-- 8480-6: Systolic blood pressure
-- 8462-4: Diastolic blood pressure
-- 8867-4: Heart rate
-- 2339-0: Glucose [Mass/volume] in Blood
-- 718-7: Hemoglobin [Mass/volume] in Blood

-- Medications (prescriptions and administrations)
CREATE TABLE medication_orders (
    order_id            UUID PRIMARY KEY,
    encounter_id        UUID NOT NULL REFERENCES encounters(encounter_id),
    patient_id          UUID NOT NULL REFERENCES patients(patient_id),
    
    -- Drug identification (RxNorm codes)
    rxnorm_code         VARCHAR(20) NOT NULL,
    medication_name     VARCHAR(200) NOT NULL,
    generic_name        VARCHAR(200),
    
    -- Dosage
    dose                DECIMAL(10,2) NOT NULL,
    dose_unit           VARCHAR(20) NOT NULL,  -- 'mg', 'mL', 'tablets'
    route               VARCHAR(50) NOT NULL,  -- 'ORAL', 'IV', 'IM'
    frequency           VARCHAR(50) NOT NULL,  -- 'BID', 'TID', 'Q6H'
    
    -- Schedule
    start_date          TIMESTAMPTZ NOT NULL,
    end_date            TIMESTAMPTZ,
    is_prn              BOOLEAN DEFAULT FALSE,  -- "as needed"
    prn_reason          TEXT,
    
    -- Safety
    indication          TEXT NOT NULL,
    allergy_checked     BOOLEAN NOT NULL DEFAULT FALSE,
    interaction_checked BOOLEAN NOT NULL DEFAULT FALSE,
    
    -- Prescriber
    ordered_by          UUID NOT NULL,
    ordered_at          TIMESTAMPTZ NOT NULL,
    
    -- Status
    status              VARCHAR(20) NOT NULL,
    discontinue_reason  TEXT,
    
    CONSTRAINT chk_route CHECK (route IN (
        'ORAL', 'IV', 'IM', 'SC', 'TOPICAL', 'INHALATION', 'RECTAL', 'OPHTHALMIC'
    )),
    CONSTRAINT chk_status CHECK (status IN (
        'ACTIVE', 'COMPLETED', 'DISCONTINUED', 'ON_HOLD', 'CANCELLED'
    ))
);

CREATE TABLE medication_administrations (
    administration_id   UUID PRIMARY KEY,
    order_id            UUID NOT NULL REFERENCES medication_orders(order_id),
    patient_id          UUID NOT NULL REFERENCES patients(patient_id),
    
    -- What was given
    dose_given          DECIMAL(10,2) NOT NULL,
    dose_unit           VARCHAR(20) NOT NULL,
    route_used          VARCHAR(50) NOT NULL,
    
    -- When and by whom
    administered_at     TIMESTAMPTZ NOT NULL,
    administered_by     UUID NOT NULL,
    
    -- Barcode verification (5 rights check)
    patient_barcode_scanned BOOLEAN NOT NULL,
    medication_barcode_scanned BOOLEAN NOT NULL,
    
    -- Outcome
    status              VARCHAR(20) NOT NULL,  -- 'GIVEN', 'REFUSED', 'HELD'
    refusal_reason      TEXT,
    adverse_reaction    TEXT,
    
    CONSTRAINT chk_status CHECK (status IN ('GIVEN', 'REFUSED', 'HELD', 'WASTED'))
);

-- Allergies and adverse reactions
CREATE TABLE allergies (
    allergy_id          UUID PRIMARY KEY,
    patient_id          UUID NOT NULL REFERENCES patients(patient_id),
    
    -- Allergen identification
    allergen_type       VARCHAR(50) NOT NULL,  -- 'MEDICATION', 'FOOD', 'ENVIRONMENT'
    allergen_code       VARCHAR(20),  -- RxNorm or SNOMED code
    allergen_name       VARCHAR(200) NOT NULL,
    
    -- Reaction
    reaction            TEXT NOT NULL,
    severity            VARCHAR(20) NOT NULL,  -- 'MILD', 'MODERATE', 'SEVERE', 'ANAPHYLAXIS'
    
    -- Onset and verification
    onset_date          DATE,
    verified_by         UUID,
    verification_status VARCHAR(20) NOT NULL,  -- 'CONFIRMED', 'SUSPECTED', 'REFUTED'
    
    -- Metadata
    recorded_at         TIMESTAMPTZ NOT NULL,
    recorded_by         UUID NOT NULL,
    is_active           BOOLEAN DEFAULT TRUE,
    
    CONSTRAINT chk_allergen_type CHECK (allergen_type IN (
        'MEDICATION', 'FOOD', 'ENVIRONMENT', 'OTHER'
    )),
    CONSTRAINT chk_severity CHECK (severity IN (
        'MILD', 'MODERATE', 'SEVERE', 'ANAPHYLAXIS'
    ))
);

-- Diagnoses (ICD-10 coded)
CREATE TABLE diagnoses (
    diagnosis_id        UUID PRIMARY KEY,
    encounter_id        UUID NOT NULL REFERENCES encounters(encounter_id),
    patient_id          UUID NOT NULL REFERENCES patients(patient_id),
    
    -- ICD-10 code (International Classification of Diseases)
    icd10_code          VARCHAR(10) NOT NULL,
    diagnosis_name      VARCHAR(200) NOT NULL,
    
    -- Type and status
    diagnosis_type      VARCHAR(20) NOT NULL,  -- 'ADMITTING', 'WORKING', 'FINAL'
    is_primary          BOOLEAN DEFAULT FALSE,
    
    -- Time
    diagnosed_at        TIMESTAMPTZ NOT NULL,
    diagnosed_by        UUID NOT NULL,
    resolved_at         TIMESTAMPTZ,
    
    -- Severity and status
    severity            VARCHAR(20),
    status              VARCHAR(20) NOT NULL,  -- 'ACTIVE', 'RESOLVED', 'INACTIVE'
    
    CONSTRAINT chk_type CHECK (diagnosis_type IN (
        'ADMITTING', 'WORKING', 'FINAL', 'DIFFERENTIAL'
    ))
);

-- Procedures (CPT coded for billing)
CREATE TABLE procedures (
    procedure_id        UUID PRIMARY KEY,
    encounter_id        UUID NOT NULL REFERENCES encounters(encounter_id),
    patient_id          UUID NOT NULL REFERENCES patients(patient_id),
    
    -- CPT code (Current Procedural Terminology)
    cpt_code            VARCHAR(10) NOT NULL,
    procedure_name      VARCHAR(200) NOT NULL,
    
    -- Scheduling
    scheduled_time      TIMESTAMPTZ,
    start_time          TIMESTAMPTZ,
    end_time            TIMESTAMPTZ,
    
    -- Performers
    primary_surgeon     UUID,
    assisting_surgeons  UUID[],
    anesthesiologist    UUID,
    
    -- Location
    operating_room      VARCHAR(20),
    
    -- Status
    status              VARCHAR(20) NOT NULL,
    cancellation_reason TEXT,
    
    -- Documentation
    procedure_note      TEXT,
    complications       TEXT,
    
    CONSTRAINT chk_status CHECK (status IN (
        'SCHEDULED', 'IN_PROGRESS', 'COMPLETED', 'CANCELLED', 'POSTPONED'
    ))
);

3.2 Temporal Data and Versioning

Clinical data thay đổi theo thời gian, và PHẢI lưu lại history (medical-legal requirement).

-- Problem list (chronic conditions) với versioning
CREATE TABLE problem_list (
    problem_id          UUID PRIMARY KEY,
    patient_id          UUID NOT NULL REFERENCES patients(patient_id),
    version             INTEGER NOT NULL,  -- version counter
    
    -- Clinical info
    snomed_code         VARCHAR(20) NOT NULL,  -- SNOMED CT code
    problem_name        VARCHAR(200) NOT NULL,
    
    -- Time bounds
    onset_date          DATE,
    resolved_date       DATE,
    valid_from          TIMESTAMPTZ NOT NULL,  -- when this version became current
    valid_to            TIMESTAMPTZ,           -- NULL = current version
    
    -- Status
    status              VARCHAR(20) NOT NULL,
    severity            VARCHAR(20),
    
    -- Provenance
    recorded_by         UUID NOT NULL,
    updated_by          UUID,
    update_reason       TEXT,
    
    -- Ensure only one current version per problem
    CONSTRAINT unique_current_version UNIQUE (patient_id, problem_id, valid_to)
        WHERE (valid_to IS NULL),
    
    CONSTRAINT chk_status CHECK (status IN (
        'ACTIVE', 'CHRONIC', 'INTERMITTENT', 'RESOLVED', 'INACTIVE'
    ))
);

-- Query current problems for a patient
CREATE VIEW current_problems AS
SELECT * FROM problem_list WHERE valid_to IS NULL;

-- Slowly Changing Dimension (SCD Type 2) pattern
-- Example: Patient changes address
CREATE TABLE patient_addresses_history (
    address_id          UUID PRIMARY KEY,
    patient_id          UUID NOT NULL REFERENCES patients(patient_id),
    
    encrypted_street    BYTEA NOT NULL,
    encrypted_city      BYTEA NOT NULL,
    encrypted_state     BYTEA NOT NULL,
    encrypted_zip       BYTEA NOT NULL,
    
    -- Temporal columns
    valid_from          TIMESTAMPTZ NOT NULL,
    valid_to            TIMESTAMPTZ,           -- NULL = current address
    is_current          BOOLEAN GENERATED ALWAYS AS (valid_to IS NULL) STORED,
    
    -- Audit
    updated_by          UUID NOT NULL,
    update_reason       VARCHAR(100)
);

3.3 Clinical Decision Support (CDS)

from typing import List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class Alert:
    severity: str  # 'INFO', 'WARNING', 'CRITICAL'
    message: str
    recommendation: str
    evidence: str
    override_allowed: bool

class ClinicalDecisionSupport:
    """
    Real-time clinical alerts and treatment recommendations.
    
    Common CDS rules:
    1. Drug-drug interactions
    2. Drug-allergy checking
    3. Duplicate therapy detection
    4. Dosage range checking
    5. Lab value monitoring
    6. Clinical guidelines compliance
    """
    
    def check_medication_order(
        self,
        patient_id: str,
        medication: dict,
        current_medications: List[dict],
        allergies: List[dict],
        lab_results: List[dict]
    ) -> List[Alert]:
        """
        Run all medication safety checks before order is finalized.
        """
        alerts = []
        
        # 1. Allergy check (highest priority)
        allergy_alert = self.check_allergies(medication, allergies)
        if allergy_alert:
            alerts.append(allergy_alert)
        
        # 2. Drug-drug interactions
        interaction_alerts = self.check_interactions(medication, current_medications)
        alerts.extend(interaction_alerts)
        
        # 3. Duplicate therapy
        duplicate_alert = self.check_duplicate_therapy(medication, current_medications)
        if duplicate_alert:
            alerts.append(duplicate_alert)
        
        # 4. Renal dosing adjustment
        renal_alert = self.check_renal_dosing(medication, lab_results)
        if renal_alert:
            alerts.append(renal_alert)
        
        # 5. Pregnancy category check
        pregnancy_alert = self.check_pregnancy_safety(patient_id, medication)
        if pregnancy_alert:
            alerts.append(pregnancy_alert)
        
        return alerts
    
    def check_allergies(self, medication: dict, allergies: List[dict]) -> Optional[Alert]:
        """
        Check if patient has allergy to ordered medication or drug class.
        """
        med_rxnorm = medication['rxnorm_code']
        
        for allergy in allergies:
            if not allergy['is_active']:
                continue
            
            # Direct match
            if allergy['allergen_code'] == med_rxnorm:
                return Alert(
                    severity='CRITICAL',
                    message=f"⚠️ ALLERGY ALERT: Patient allergic to {medication['name']}",
                    recommendation="DO NOT ADMINISTER. Consider alternative medication.",
                    evidence=f"Documented reaction: {allergy['reaction']} ({allergy['severity']})",
                    override_allowed=False  # Hard stop
                )
            
            # Cross-allergy check (e.g., Penicillin → Cephalosporins)
            if self.check_cross_allergy(med_rxnorm, allergy['allergen_code']):
                return Alert(
                    severity='WARNING',
                    message=f"Cross-allergy risk: Patient allergic to {allergy['allergen_name']}",
                    recommendation=f"Use with caution. Consider desensitization protocol.",
                    evidence=f"Known cross-reactivity between drug classes",
                    override_allowed=True  # Provider can override with justification
                )
        
        return None
    
    def check_interactions(
        self,
        new_med: dict,
        current_meds: List[dict]
    ) -> List[Alert]:
        """
        Check for drug-drug interactions.
        Uses FDA interaction tables and clinical databases.
        """
        alerts = []
        
        for med in current_meds:
            if med['status'] != 'ACTIVE':
                continue
            
            # Query interaction database (simplified)
            interaction = self.drug_interaction_db.query(
                drug1=new_med['rxnorm_code'],
                drug2=med['rxnorm_code']
            )
            
            if interaction:
                if interaction['severity'] == 'SEVERE':
                    alerts.append(Alert(
                        severity='CRITICAL',
                        message=f"⚠️ SEVERE INTERACTION: {new_med['name']} + {med['name']}",
                        recommendation=interaction['recommendation'],
                        evidence=interaction['mechanism'],
                        override_allowed=True
                    ))
                elif interaction['severity'] == 'MODERATE':
                    alerts.append(Alert(
                        severity='WARNING',
                        message=f"Moderate interaction: {new_med['name']} + {med['name']}",
                        recommendation=interaction['recommendation'],
                        evidence=interaction['mechanism'],
                        override_allowed=True
                    ))
        
        return alerts
    
    def check_renal_dosing(self, medication: dict, lab_results: List[dict]) -> Optional[Alert]:
        """
        Adjust dosage based on kidney function (eGFR).
        """
        # Find most recent creatinine
        recent_cr = next(
            (lab for lab in sorted(lab_results, key=lambda x: x['observed_at'], reverse=True)
             if lab['loinc_code'] == '2160-0'),  # Serum creatinine
            None
        )
        
        if not recent_cr:
            return Alert(
                severity='WARNING',
                message="No recent creatinine found",
                recommendation="Order serum creatinine before administering renally-cleared drug",
                evidence="Unable to assess renal function",
                override_allowed=True
            )
        
        # Calculate eGFR (simplified CKD-EPI equation)
        egfr = self.calculate_egfr(recent_cr['value_numeric'], patient_age, patient_sex)
        
        # Check if dose adjustment needed
        if medication['requires_renal_adjustment']:
            recommended_dose = self.get_renal_adjusted_dose(
                medication['rxnorm_code'],
                egfr
            )
            
            if recommended_dose != medication['dose']:
                return Alert(
                    severity='WARNING',
                    message=f"Renal dosing adjustment recommended (eGFR: {egfr} mL/min)",
                    recommendation=f"Reduce dose to {recommended_dose} {medication['dose_unit']}",
                    evidence=f"Based on eGFR {egfr} mL/min (Stage {self.ckd_stage(egfr)} CKD)",
                    override_allowed=True
                )
        
        return None

4. HL7 v2 — Legacy Messaging Standard

HL7 v2 (Health Level 7 version 2) là chuẩn messaging được sử dụng rộng rãi nhất trong healthcare IT, tuy nhiên nó đã 40 tuổi và có nhiều quirks.

4.1 HL7 Message Structure

MSH|^~\&|SENDING_APP|SENDING_FACILITY|RECEIVING_APP|RECEIVING_FACILITY|20260417120000||ADT^A01|MSG00001|P|2.5
EVN|A01|20260417120000
PID|1||MRN123456^^^HOSPITAL^MR||Doe^John^A||19800101|M|||123 Main St^^Boston^MA^02101^USA|||||||123-45-6789
PV1|1|I|ICU^201^01^HOSPITAL||||DOC123^Smith^Jane|||ICU|||||||DOC123|Emergency|V123456||||||||||||||||||||HOSPITAL|||||20260417080000

Segment structure:
MSH = Message Header
EVN = Event Type
PID = Patient Identification
PV1 = Patient Visit

Anatomy of an HL7 message:

Segment|Field1|Field2^Component1^Component2|...

Delimiters:
| = Field separator
^ = Component separator (within field)
& = Subcomponent separator
~ = Field repeat separator
\ = Escape character

4.2 Common HL7 Message Types

ADT (Admit, Discharge, Transfer):
  A01 = Patient admission
  A02 = Patient transfer
  A03 = Patient discharge
  A04 = Patient registration
  A08 = Update patient information
  A11 = Cancel patient admit
  A31 = Update person information

ORM (Order Message):
  O01 = Order message
  O02 = Order response

ORU (Observation Result):
  R01 = Unsolicited observation message (lab results)

DFT (Detailed Financial Transaction):
  P03 = Post detail financial transaction

SIU (Scheduling Information Unsolicited):
  S12 = New appointment booking
  S13 = Appointment rescheduling
  S15 = Appointment cancellation

4.3 HL7 Message Parsing và Generation

from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime

@dataclass
class HL7Segment:
    segment_type: str
    fields: List[str]
    
    def get_field(self, index: int) -> str:
        """Fields are 1-indexed in HL7."""
        if index == 0:
            return self.segment_type
        try:
            return self.fields[index - 1]
        except IndexError:
            return ""
    
    def get_component(self, field_index: int, component_index: int) -> str:
        """Extract component from composite field."""
        field = self.get_field(field_index)
        components = field.split('^')
        try:
            return components[component_index]
        except IndexError:
            return ""

class HL7Message:
    """
    HL7 v2.x message parser and generator.
    """
    
    def __init__(self, message_text: str = None):
        self.segments: List[HL7Segment] = []
        self.field_separator = '|'
        self.encoding_chars = '^~\\&'
        
        if message_text:
            self.parse(message_text)
    
    def parse(self, message_text: str):
        """Parse HL7 message into segments."""
        lines = message_text.strip().split('\n')
        
        for line in lines:
            if not line.strip():
                continue
            
            # MSH segment is special — field separator is in position 3
            if line.startswith('MSH'):
                segment_type = 'MSH'
                self.field_separator = line[3]
                self.encoding_chars = line[4:8]
                # MSH fields start after encoding chars
                fields = line[9:].split(self.field_separator)
            else:
                parts = line.split(self.field_separator)
                segment_type = parts[0]
                fields = parts[1:]
            
            self.segments.append(HL7Segment(segment_type, fields))
    
    def get_segment(self, segment_type: str, occurrence: int = 0) -> HL7Segment:
        """Get segment by type (e.g., 'PID', 'PV1')."""
        segments = [s for s in self.segments if s.segment_type == segment_type]
        try:
            return segments[occurrence]
        except IndexError:
            return None
    
    def extract_patient_info(self) -> Dict:
        """Extract patient demographics from PID segment."""
        pid = self.get_segment('PID')
        if not pid:
            return None
        
        # PID-2: Patient ID (deprecated, use PID-3)
        # PID-3: Patient Identifier List (MRN)
        patient_id_field = pid.get_field(3)
        mrn = patient_id_field.split('^')[0]
        
        # PID-5: Patient Name (Last^First^Middle)
        name_field = pid.get_field(5)
        name_parts = name_field.split('^')
        last_name = name_parts[0] if len(name_parts) > 0 else ""
        first_name = name_parts[1] if len(name_parts) > 1 else ""
        middle_name = name_parts[2] if len(name_parts) > 2 else ""
        
        # PID-7: Date of Birth (YYYYMMDD)
        dob_str = pid.get_field(7)
        dob = datetime.strptime(dob_str, '%Y%m%d').date() if dob_str else None
        
        # PID-8: Sex
        sex = pid.get_field(8)
        
        # PID-11: Patient Address
        address_field = pid.get_field(11)
        address_parts = address_field.split('^')
        street = address_parts[0] if len(address_parts) > 0 else ""
        city = address_parts[2] if len(address_parts) > 2 else ""
        state = address_parts[3] if len(address_parts) > 3 else ""
        zip_code = address_parts[4] if len(address_parts) > 4 else ""
        
        # PID-13: Phone Number
        phone = pid.get_field(13)
        
        # PID-19: SSN
        ssn = pid.get_field(19)
        
        return {
            'mrn': mrn,
            'first_name': first_name,
            'last_name': last_name,
            'middle_name': middle_name,
            'date_of_birth': dob,
            'sex': sex,
            'address': {
                'street': street,
                'city': city,
                'state': state,
                'zip': zip_code
            },
            'phone': phone,
            'ssn': ssn
        }
    
    def extract_lab_results(self) -> List[Dict]:
        """Extract lab results from OBX segments in ORU message."""
        results = []
        
        obx_segments = [s for s in self.segments if s.segment_type == 'OBX']
        
        for obx in obx_segments:
            # OBX-2: Value Type ('NM' = Numeric, 'ST' = String, 'TX' = Text)
            value_type = obx.get_field(2)
            
            # OBX-3: Observation Identifier (LOINC code)
            obs_id_field = obx.get_field(3)
            obs_id_parts = obs_id_field.split('^')
            loinc_code = obs_id_parts[0]
            test_name = obs_id_parts[1] if len(obs_id_parts) > 1 else ""
            
            # OBX-5: Observation Value
            value = obx.get_field(5)
            
            # OBX-6: Units
            units = obx.get_field(6)
            
            # OBX-7: Reference Range
            reference_range = obx.get_field(7)
            
            # OBX-8: Abnormal Flags ('H' = High, 'L' = Low, 'N' = Normal)
            abnormal_flag = obx.get_field(8)
            
            # OBX-11: Observation Result Status
            # 'F' = Final, 'P' = Preliminary, 'C' = Corrected
            status = obx.get_field(11)
            
            # OBX-14: Date/Time of Observation
            observed_datetime = obx.get_field(14)
            
            results.append({
                'loinc_code': loinc_code,
                'test_name': test_name,
                'value': value,
                'value_type': value_type,
                'units': units,
                'reference_range': reference_range,
                'is_abnormal': abnormal_flag in ['H', 'L', 'HH', 'LL'],
                'abnormal_flag': abnormal_flag,
                'status': status,
                'observed_at': observed_datetime
            })
        
        return results
    
    def generate_ack(self, original_message: 'HL7Message', ack_code: str = 'AA') -> str:
        """
        Generate ACK (acknowledgment) message.
        
        ack_code:
          AA = Application Accept
          AE = Application Error
          AR = Application Reject
        """
        msh = original_message.get_segment('MSH')
        
        # Swap sender/receiver
        sending_app = msh.get_field(3)
        sending_facility = msh.get_field(4)
        receiving_app = msh.get_field(5)
        receiving_facility = msh.get_field(6)
        message_control_id = msh.get_field(10)
        
        timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
        
        ack_message = (
            f"MSH|^~\\&|{receiving_app}|{receiving_facility}|{sending_app}|{sending_facility}|{timestamp}||ACK|ACK{message_control_id}|P|2.5\r"
            f"MSA|{ack_code}|{message_control_id}\r"
        )
        
        return ack_message

# Example usage
hl7_adt_a01 = """
MSH|^~\\&|HIS|HOSPITAL|LAB|LABSYS|20260417120000||ADT^A01|MSG00001|P|2.5
EVN|A01|20260417120000
PID|1||MRN123456^^^HOSPITAL^MR||Nguyen^Van^A||19850315|M|||45 Le Loi St^^HCMC^SGN^700000^VN|||||||123456789
PV1|1|I|ICU^201^01||||DOC001^Tran^Thi^Bich|||ICU|||||||DOC001|Emergency||||||||||||||||||||HOSPITAL|||||20260417080000
"""

parser = HL7Message(hl7_adt_a01)
patient = parser.extract_patient_info()
print(f"Patient: {patient['first_name']} {patient['last_name']}, MRN: {patient['mrn']}")

# Generate ACK
ack = parser.generate_ack(parser, 'AA')
print(ack)

4.4 Integration Engine (Mirth Connect)

// Mirth Connect JavaScript transformer
// Convert HL7 ADT^A01 to FHIR Patient resource

// Input: HL7 ADT message
// Output: FHIR Patient JSON

var pid = msg['PID'];
var pv1 = msg['PV1'];

// Extract patient demographics
var mrn = pid['PID.3']['PID.3.1'].toString();
var lastName = pid['PID.5']['PID.5.1'].toString();
var firstName = pid['PID.5']['PID.5.2'].toString();
var dob = pid['PID.7']['PID.7.1'].toString(); // YYYYMMDD
var gender = pid['PID.8']['PID.8.1'].toString(); // M/F

// Transform gender code
var fhirGender = gender === 'M' ? 'male' : gender === 'F' ? 'female' : 'unknown';

// Format date of birth (YYYYMMDD → YYYY-MM-DD)
var formattedDob = dob.substring(0, 4) + '-' + dob.substring(4, 6) + '-' + dob.substring(6, 8);

// Build FHIR Patient resource
var fhirPatient = {
    "resourceType": "Patient",
    "identifier": [
        {
            "system": "urn:oid:2.16.840.1.113883.19.5",
            "value": mrn,
            "type": {
                "coding": [
                    {
                        "system": "http://terminology.hl7.org/CodeSystem/v2-0203",
                        "code": "MR",
                        "display": "Medical Record Number"
                    }
                ]
            }
        }
    ],
    "name": [
        {
            "use": "official",
            "family": lastName,
            "given": [firstName]
        }
    ],
    "gender": fhirGender,
    "birthDate": formattedDob,
    "active": true
};

// Output transformed message
return JSON.stringify(fhirPatient, null, 2);

5. FHIR — Fast Healthcare Interoperability Resources

FHIR (phát âm là "fire") là chuẩn hiện đại thay thế HL7 v2, sử dụng RESTful API, JSON/XML, và resource-based model.

5.1 FHIR Core Concepts

Resource: Đơn vị dữ liệu cơ bản (Patient, Observation, Medication, etc.)

{
  "resourceType": "Patient",
  "id": "example",
  "meta": {
    "versionId": "1",
    "lastUpdated": "2026-04-17T10:30:00Z"
  },
  "identifier": [
    {
      "system": "http://hospital.org/mrn",
      "value": "MRN123456"
    }
  ],
  "active": true,
  "name": [
    {
      "use": "official",
      "family": "Nguyen",
      "given": ["Van", "A"]
    }
  ],
  "gender": "male",
  "birthDate": "1985- 03-15",
  "address": [
    {
      "use": "home",
      "line": ["45 Le Loi Street"],
      "city": "Ho Chi Minh City",
      "state": "SGN",
      "postalCode": "700000",
      "country": "VN"
    }
  ],
  "telecom": [
    {
      "system": "phone",
      "value": "+84-28-1234-5678",
      "use": "home"
    },
    {
      "system": "email",
      "value": "patient@example.com",
      "use": "home"
    }
  ]
}

Common FHIR Resources:

  • Patient: Demographics and administrative information
  • Practitioner: Healthcare professional (doctor, nurse, etc.)
  • Observation: Measurements and simple assertions (vitals, lab tests)
  • Condition: Clinical conditions, problems, diagnoses
  • Procedure: Performed procedures
  • MedicationRequest: Prescription or order
  • MedicationAdministration: Record of medication given
  • Encounter: Interaction between patient and healthcare provider
  • AllergyIntolerance: Allergy or adverse reaction risk
  • DiagnosticReport: Lab report, radiology report
  • DocumentReference: Clinical document (CDA, PDF)

5.2 FHIR RESTful API

# Create (POST)
POST /fhir/Patient HTTP/1.1
Host: api.hospital.com
Content-Type: application/fhir+json
Authorization: Bearer <token>

{
  "resourceType": "Patient",
  "name": [{"family": "Nguyen", "given": ["Van", "A"]}],
  "gender": "male",
  "birthDate": "1985-03-15"
}

# Read (GET)
GET /fhir/Patient/123 HTTP/1.1

# Update (PUT)
PUT /fhir/Patient/123 HTTP/1.1
Content-Type: application/fhir+json

{
  "resourceType": "Patient",
  "id": "123",
  ...
}

# Delete (DELETE)
DELETE /fhir/Patient/123 HTTP/1.1

# Search (GET with query params)
GET /fhir/Patient?family=Nguyen&birthdate=1985-03-15 HTTP/1.1

# Search with POST (for complex queries)
POST /fhir/Patient/_search HTTP/1.1
Content-Type: application/x-www-form-urlencoded

family=Nguyen&birthdate=ge1980-01-01

FHIR Search Parameters:

# Common search parameters
_id=123                          # by ID
_lastUpdated=gt2026-01-01        # updated after date
_tag=http://example.org|vip      # by tag
_profile=http://hl7.org/fhir/us/core/Patient  # by profile

# Resource-specific parameters (Patient)
family=Nguyen                    # family name
given=Van                        # given name
birthdate=1985-03-15            # exact birthdate
birthdate=ge1980                # greater than or equal
gender=male                      # gender
identifier=MRN123456            # identifier value

# Modifiers
family:exact=Nguyen             # exact match (case sensitive)
name:contains=nguyen            # substring match
birthdate=le2000-12-31          # less than or equal

# Composite parameters
_has:Observation:patient:code=http://loinc.org|8480-6
  # Patients with blood pressure observations

# Prefixes for numbers and dates
eq = equal
ne = not equal
gt = greater than
lt = less than
ge = greater than or equal
le = less than or equal
sa = starts after
eb = ends before

5.3 FHIR Client Implementation

import requests
from typing import List, Dict, Optional
from datetime import datetime
import json

class FHIRClient:
    """
    FHIR R4 client implementation.
    Supports SMART on FHIR authentication.
    """
    
    def __init__(self, base_url: str, access_token: Optional[str] = None):
        self.base_url = base_url.rstrip('/')
        self.access_token = access_token
        self.session = requests.Session()
        
        if access_token:
            self.session.headers.update({
                'Authorization': f'Bearer {access_token}',
                'Accept': 'application/fhir+json',
                'Content-Type': 'application/fhir+json'
            })
    
    def create(self, resource: Dict) -> Dict:
        """Create a new resource (POST)."""
        resource_type = resource['resourceType']
        url = f"{self.base_url}/{resource_type}"
        
        response = self.session.post(url, json=resource)
        response.raise_for_status()
        
        return response.json()
    
    def read(self, resource_type: str, resource_id: str) -> Dict:
        """Read a resource by ID (GET)."""
        url = f"{self.base_url}/{resource_type}/{resource_id}"
        
        response = self.session.get(url)
        response.raise_for_status()
        
        return response.json()
    
    def update(self, resource: Dict) -> Dict:
        """Update an existing resource (PUT)."""
        resource_type = resource['resourceType']
        resource_id = resource['id']
        url = f"{self.base_url}/{resource_type}/{resource_id}"
        
        response = self.session.put(url, json=resource)
        response.raise_for_status()
        
        return response.json()
    
    def delete(self, resource_type: str, resource_id: str) -> bool:
        """Delete a resource (DELETE)."""
        url = f"{self.base_url}/{resource_type}/{resource_id}"
        
        response = self.session.delete(url)
        response.raise_for_status()
        
        return response.status_code == 204
    
    def search(self, resource_type: str, params: Dict) -> List[Dict]:
        """
        Search for resources with query parameters.
        Returns list of matching resources.
        """
        url = f"{self.base_url}/{resource_type}"
        
        response = self.session.get(url, params=params)
        response.raise_for_status()
        
        bundle = response.json()
        
        # Extract resources from Bundle
        resources = []
        if bundle.get('entry'):
            resources = [entry['resource'] for entry in bundle['entry']]
        
        return resources
    
    def search_by_patient(self, resource_type: str, patient_id: str) -> List[Dict]:
        """Search for resources associated with a patient."""
        return self.search(resource_type, {'patient': patient_id})
    
    def get_patient_observations(
        self,
        patient_id: str,
        code: Optional[str] = None,
        date_range: Optional[tuple] = None
    ) -> List[Dict]:
        """
        Get observations for a patient with optional filtering.
        
        Args:
            patient_id: Patient resource ID
            code: LOINC code (e.g., '8480-6' for systolic BP)
            date_range: Tuple of (start_date, end_date)
        """
        params = {'patient': patient_id}
        
        if code:
            params['code'] = f"http://loinc.org|{code}"
        
        if date_range:
            start, end = date_range
            params['date'] = f"ge{start}&date=le{end}"
        
        # Sort by date descending
        params['_sort'] = '-date'
        
        return self.search('Observation', params)
    
    def create_observation(
        self,
        patient_id: str,
        loinc_code: str,
        display: str,
        value: float,
        unit: str,
        performer_id: str
    ) -> Dict:
        """
        Create a new observation (e.g., vital sign, lab result).
        """
        observation = {
            "resourceType": "Observation",
            "status": "final",
            "category": [{
                "coding": [{
                    "system": "http://terminology.hl7.org/CodeSystem/observation-category",
                    "code": "vital-signs",
                    "display": "Vital Signs"
                }]
            }],
            "code": {
                "coding": [{
                    "system": "http://loinc.org",
                    "code": loinc_code,
                    "display": display
                }]
            },
            "subject": {
                "reference": f"Patient/{patient_id}"
            },
            "performer": [{
                "reference": f"Practitioner/{performer_id}"
            }],
            "effectiveDateTime": datetime.now().isoformat(),
            "valueQuantity": {
                "value": value,
                "unit": unit,
                "system": "http://unitsofmeasure.org",
                "code": unit
            }
        }
        
        return self.create(observation)


# Example usage
fhir_client = FHIRClient(
    base_url='https://api.hospital.com/fhir',
    access_token='your_access_token_here'
)

# Create a blood pressure observation
bp_obs = fhir_client.create_observation(
    patient_id='123',
    loinc_code='8480-6',
    display='Systolic blood pressure',
    value=120,
    unit='mmHg',
    performer_id='doctor-456'
)

# Search for all observations for a patient
observations = fhir_client.get_patient_observations(
    patient_id='123',
    date_range=('2026-01-01', '2026-04-17')
)

print(f"Found {len(observations)} observations")
for obs in observations:
    code = obs['code']['coding'][0]['display']
    value = obs.get('valueQuantity', {}).get('value', 'N/A')
    unit = obs.get('valueQuantity', {}).get('unit', '')
    print(f"  {code}: {value} {unit}")

5.4 FHIR Bundles — Transaction and Batch Operations

def create_patient_with_observations(
    patient_data: Dict,
    observations: List[Dict]
) -> Dict:
    """
    Use FHIR Bundle to create patient and related observations atomically.
    """
    bundle = {
        "resourceType": "Bundle",
        "type": "transaction",
        "entry": []
    }
    
    # Add patient as first entry
    bundle['entry'].append({
        "fullUrl": "urn:uuid:patient-temp-id",
        "resource": patient_data,
        "request": {
            "method": "POST",
            "url": "Patient"
        }
    })
    
    # Add observations referencing the temporary patient ID
    for obs in observations:
        obs['subject'] = {"reference": "urn:uuid:patient-temp-id"}
        bundle['entry'].append({
            "resource": obs,
            "request": {
                "method": "POST",
                "url": "Observation"
            }
        })
    
    # POST bundle to server (all-or-nothing transaction)
    response = requests.post(
        'https://api.hospital.com/fhir',
        json=bundle,
        headers={
            'Content-Type': 'application/fhir+json',
            'Authorization': 'Bearer <token>'
        }
    )
    
    return response.json()

6. Data Security — Encryption, Access Control, De-identification

6.1 De-identification for Research

HIPAA Safe Harbor method — remove 18 identifiers:

import re
from typing import Dict
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine

class PHIDeidentifier:
    """
    De-identify PHI for research purposes.
    
    Two methods:
    1. Safe Harbor: Remove 18 HIPAA identifiers
    2. Expert Determination: Statistical guarantee of re-identification risk < threshold
    """
    
    def __init__(self):
        self.analyzer = AnalyzerEngine()
        self.anonymizer = AnonymizerEngine()
    
    def remove_identifiers_safe_harbor(self, text: str) -> str:
        """
        Remove all 18 HIPAA Safe Harbor identifiers.
        """
        # Name detection
        text = self._redact_names(text)
        
        # Dates (except year)
        text = self._generalize_dates(text)
        
        # Phone numbers
        text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]', text)
        
        # Email addresses
        text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]', text)
        
        # SSN
        text = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]', text)
        
        # MRN (assuming format MRN######)
        text = re.sub(r'\bMRN\d+\b', '[MRN]', text, flags=re.IGNORECASE)
        
        # IP addresses
        text = re.sub(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', '[IP]', text)
        
        # URLs
        text = re.sub(r'https?://[^\s]+', '[URL]', text)
        
        # Geographic subdivisions smaller than state (zip codes)
        text = self._generalize_zip_codes(text)
        
        return text
    
    def _generalize_dates(self, text: str) -> str:
        """
        Replace specific dates with just year (except for those > 89 years old).
        """
        # Match dates in various formats
        date_pattern = r'\b(\d{1,2}[-/]\d{1,2}[-/])(\d{4})\b'
        
        def replace_date(match):
            year = match.group(2)
            # For patients > 89 years old, even year should be redacted
            if int(year) < 1937:  # 2026 - 89 = 1937
                return '[DATE]'
            return year
        
        return re.sub(date_pattern, replace_date, text)
    
    def _generalize_zip_codes(self, text: str) -> str:
        """
        Replace zip codes with first 3 digits only (unless population < 20,000).
        """
        def replace_zip(match):
            zip_code = match.group()
            # In production, check census data for population
            # If population < 20,000, return '000'
            return zip_code[:3] + '00'
        
        return re.sub(r'\b\d{5}(?:-\d{4})?\b', replace_zip, text)
    
    def _redact_names(self, text: str) -> str:
        """Use NER model to detect and redact names."""
        analyzer_results = self.analyzer.analyze(
            text=text,
            entities=["PERSON"],
            language='en'
        )
        
        anonymized_text = self.anonymizer.anonymize(
            text=text,
            analyzer_results=analyzer_results
        )
        
        return anonymized_text.text
    
    def k_anonymize(self, dataframe, quasi_identifiers: List[str], k: int = 5):
        """
        K-anonymization: Ensure each combination of quasi-identifiers
        appears at least k times in dataset.
        
        Args:
            dataframe: pandas DataFrame with patient data
            quasi_identifiers: columns that could re-identify (age, zip, gender)
            k: minimum group size
        """
        # Group by quasi-identifiers
        grouped = dataframe.groupby(quasi_identifiers)
        
        # Suppress or generalize groups smaller than k
        filtered = grouped.filter(lambda x: len(x) >= k)
        
        return filtered

6.2 Audit Logging — Comprehensive Tracking

-- Audit log schema (immutable, append-only)
CREATE TABLE phi_access_log (
    log_id              BIGSERIAL PRIMARY KEY,
    event_id            UUID UNIQUE NOT NULL DEFAULT gen_random_uuid(),
    
    -- Who
    user_id             UUID NOT NULL,
    user_type           VARCHAR(50) NOT NULL,  -- 'PHYSICIAN', 'NURSE', 'ADMIN', 'SYSTEM'
    session_id          UUID NOT NULL,
    
    -- What
    action              VARCHAR(50) NOT NULL,  -- 'VIEW', 'CREATE', 'UPDATE', 'DELETE', 'EXPORT'
    resource_type       VARCHAR(100) NOT NULL, -- 'PATIENT_RECORD', 'LAB_RESULT', etc.
    resource_id         UUID NOT NULL,
    patient_id          UUID NOT NULL,  -- Denormalized for fast patient audit queries
    
    -- When
    timestamp           TIMESTAMPTZ NOT NULL DEFAULT now(),
    
    -- Where
    ip_address          INET NOT NULL,
    user_agent          TEXT,
    facility_id         UUID,
    department_id       UUID,
    workstation_id      VARCHAR(100),
    
    -- Why (optional but recommended)
    access_reason       TEXT,
    is_break_glass      BOOLEAN DEFAULT FALSE,
    
    -- What changed (for UPDATE/DELETE)
    old_value           JSONB,
    new_value           JSONB,
    changed_fields      TEXT[],
    
    -- Result
    was_successful      BOOLEAN NOT NULL,
    failure_reason      TEXT,
    
    -- Context
    correlation_id      UUID,  -- Link related actions (e.g., create encounter + observations)
    parent_log_id       BIGINT,  -- For nested operations
    
    CONSTRAINT chk_action CHECK (action IN (
        'VIEW', 'CREATE', 'UPDATE', 'DELETE', 'EXPORT',
        'SEARCH', 'PRINT', 'EMAIL', 'FAX', 'DOWNLOAD'
    ))
);

-- Indexes for common queries
CREATE INDEX idx_phi_access_log_patient ON phi_access_log(patient_id, timestamp DESC);
CREATE INDEX idx_phi_access_log_user ON phi_access_log(user_id, timestamp DESC);
CREATE INDEX idx_phi_access_log_timestamp ON phi_access_log(timestamp DESC);
CREATE INDEX idx_phi_access_log_break_glass ON phi_access_log(is break_glass) WHERE is_break_glass = TRUE;

-- Partitioning for performance (partition by month)
CREATE TABLE phi_access_log_2026_04 PARTITION OF phi_access_log
    FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');

-- Retention policy: Keep audit logs for 6 years minimum (HIPAA requirement)
CREATE OR REPLACE FUNCTION archive_old_audit_logs() RETURNS void AS $
BEGIN
    -- Move logs older than 6 years to archive storage
    INSERT INTO phi_access_log_archive
    SELECT * FROM phi_access_log
    WHERE timestamp < now() - INTERVAL '6 years';
    
    DELETE FROM phi_access_log
    WHERE timestamp < now() - INTERVAL '6 years';
END;
$ LANGUAGE plpgsql;

Audit logging middleware:

from functools import wraps
from flask import request, g
import logging
import json

def audit_phi_access(action: str, resource_type: str):
    """
    Decorator for auditing PHI access in Flask routes.
    """
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            # Capture context before executing function
            user_id = g.current_user.id
            session_id = g.session_id
            ip_address = request.remote_addr
            user_agent = request.user_agent.string
            timestamp = datetime.now()
            
            # Extract resource_id from route params
            resource_id = kwargs.get('id') or kwargs.get('patient_id')
            
            # Execute the actual function
            try:
                result = f(*args, **kwargs)
                was_successful = True
                failure_reason = None
            except Exception as e:
                was_successful = False
                failure_reason = str(e)
                raise
            finally:
                # Log to audit table (asynchronous to avoid blocking)
                audit_entry = {
                    'user_id': user_id,
                    'session_id': session_id,
                    'action': action,
                    'resource_type': resource_type,
                    'resource_id': resource_id,
                    'timestamp': timestamp.isoformat(),
                    'ip_address': ip_address,
                    'user_agent': user_agent,
                    'was_successful': was_successful,
                    'failure_reason': failure_reason
                }
                
                # Send to audit service (async queue)
                audit_queue.send_message(json.dumps(audit_entry))
                
                # Also log to structured logger for SIEM
                logging.info(
                    "PHI_ACCESS",
                    extra=audit_entry
                )
            
            return result
        
        return decorated_function
    return decorator

# Usage
@app.route('/api/patients/<patient_id>/records', methods=['GET'])
@audit_phi_access(action='VIEW', resource_type='PATIENT_RECORD')
def get_patient_records(patient_id):
    # Your logic here
    records = db.query(f"SELECT * FROM records WHERE patient_id = {patient_id}")
    return jsonify(records)

6.3 Breach Detection and Response

from typing import List
from datetime import datetime, timedelta

class BreachDetector:
    """
    Detect potential HIPAA breaches from audit logs.
    
    Suspicious patterns:
    1. Unusual access volume
    2. Access outside normal working hours
    3. Access to VIP/celebrity patients
    4. Bulk exports
    5. Access from unusual locations
    6. Multiple failed access attempts
    """
    
    def detect_unusual_access_volume(self, user_id: str, threshold: int = 100):
        """
        Alert if user accesses more than threshold patients in 24 hours.
        """
        query = """
            SELECT COUNT(DISTINCT patient_id) as patient_count
            FROM phi_access_log
            WHERE user_id = %s
              AND timestamp > now() - INTERVAL '24 hours'
              AND was_successful = TRUE
        """
        
        result = db.query(query, [user_id])
        patient_count = result[0]['patient_count']
        
        if patient_count > threshold:
            self.raise_alert(
                severity='HIGH',
                message=f"User {user_id} accessed {patient_count} patients in 24h",
                recommendation="Investigate potential data exfiltration"
            )
    
    def detect_vip_access(self, user_id: str, patient_id: str) -> bool:
        """
        Check if user has legitimate reason to access VIP patient.
        """
        # Check if patient is VIP
        is_vip = db.query("SELECT is_vip FROM patients WHERE patient_id = %s", [patient_id])
        
        if not is_vip:
            return True  # Not VIP, no special handling
        
        # Check if user is on patient's care team
        is_on_care_team = db.query("""
            SELECT 1 FROM care_team
            WHERE patient_id = %s AND user_id = %s
        """, [patient_id, user_id])
        
        if not is_on_care_team:
            self.raise_alert(
                severity='CRITICAL',
                message=f"Unauthorized access to VIP patient {patient_id}",
                recommendation="Immediate investigation required"
            )
            return False
        
        return True
    
    def detect_after_hours_access(self, user_id: str):
        """
        Alert on access outside expected working hours.
        """
        query = """
            SELECT *
            FROM phi_access_log
            WHERE user_id = %s
              AND timestamp > now() - INTERVAL '7 days'
              AND (
                  EXTRACT(HOUR FROM timestamp) < 6
                  OR EXTRACT(HOUR FROM timestamp) > 22
              )
              AND was_successful = TRUE
        """
        
        after_hours_logs = db.query(query, [user_id])
        
        if len(after_hours_logs) > 10:
            self.raise_alert(
                severity='MEDIUM',
                message=f"User {user_id} has {len(after_hours_logs)} after-hours accesses",
                recommendation="Verify if access was legitimate"
            )
    
    def mandatory_breach_notification(self, affected_patients: List[str]):
        """
        Trigger breach notification process if breach affects > 500 patients.
        
        HIPAA requirements:
        - Notify affected individuals within 60 days
        - Notify HHS (Department of Health and Human Services)
        - Notify media if breach affects > 500 residents of a state
        - Maintain log of breaches affecting < 500 individuals
        """
        num_affected = len(affected_patients)
        
        if num_affected >= 500:
            # Major breach — immediate notification required
            self.notify_hhs(affected_patients)
            self.notify_affected_individuals(affected_patients)
            self.notify_media_if_required(affected_patients)
        else:
            # Minor breach — log and aggregate for annual report
            self.log_minor_breach(affected_patients)

7. Compliance Monitoring — Continuous Compliance

from typing import Dict, List
from dataclasses import dataclass
from enum import Enum

class ComplianceStatus(Enum):
    COMPLIANT = "compliant"
    NON_COMPLIANT = "non_compliant"
    AT_RISK = "at_risk"
    UNKNOWN = "unknown"

@dataclass
class ComplianceCheck:
    check_id: str
    category: str  # 'TECHNICAL', 'ADMINISTRATIVE', 'PHYSICAL'
    description: str
    status: ComplianceStatus
    evidence: str
    last_verified: datetime
    next_verification_due: datetime
    responsible_party: str

class HIPAAComplianceMonitor:
    """
    Automated compliance monitoring system.
    
    Continuously verifies HIPAA controls are in place.
    """
    
    def run_all_checks(self) -> Dict[str, List[ComplianceCheck]]:
        """
        Run all compliance checks and return results.
        """
        results = {
            'TECHNICAL': [],
            'ADMINISTRATIVE': [],
            'PHYSICAL': []
        }
        
        # Technical safeguards
        results['TECHNICAL'].extend([
            self.check_encryption_at_rest(),
            self.check_encryption_in_transit(),
            self.check_access_controls(),
            self.check_audit_logging(),
            self.check_session_timeouts(),
            self.check_password_policies(),
            self.check_mfa_enabled(),
            self.check_vulnerability_scanning()
        ])
        
        # Administrative safeguards
        results['ADMINISTRATIVE'].extend([
            self.check_risk_assessment_current(),
            self.check_workforce_training_current(),
            self.check_baa_in_place(),
            self.check_incident_response_plan(),
            self.check_disaster_recovery_plan()
        ])
        
        # Physical safeguards
        results['PHYSICAL'].extend([
            self.check_facility_access_controls(),
            self.check_workstation_security(),
            self.check_device_encryption()
        ])
        
        return results
    
    def check_encryption_at_rest(self) -> ComplianceCheck:
        """Verify all PHI is encrypted at rest."""
        # Check database encryption status
        db_encrypted = self.verify_database_encryption()
        
        # Check file storage encryption (S3, etc.)
        storage_encrypted = self.verify_storage_encryption()
        
        # Check backup encryption
        backup_encrypted = self.verify_backup_encryption()
        
        all_encrypted = db_encrypted and storage_encrypted and backup_encrypted
        
        return ComplianceCheck(
            check_id='TECH-001',
            category='TECHNICAL',
            description='All PHI encrypted at rest with AES-256',
            status=ComplianceStatus.COMPLIANT if all_encrypted else ComplianceStatus.NON_COMPLIANT,
            evidence=f"DB: {db_encrypted}, Storage: {storage_encrypted}, Backup: {backup_encrypted}",
            last_verified=datetime.now(),
            next_verification_due=datetime.now() + timedelta(days=30),
            responsible_party='Security Team'
        )
    
    def check_audit_logging(self) -> ComplianceCheck:
        """Verify audit logs are being collected and retained."""
        # Check if audit logging is enabled
        logging_enabled = self.verify_audit_logging_enabled()
        
        # Check if logs are retained for 6 years
        retention_ok = self.verify_log_retention_policy()
        
        # Check if logs are tamper-proof (write-once storage)
        tamper_proof = self.verify_audit_log_integrity()
        
        # Check if logs are being monitored
        monitoring_active = self.verify_log_monitoring_active()
        
        compliant = all([logging_enabled, retention_ok, tamper_proof, monitoring_active])
        
        return ComplianceCheck(
            check_id='TECH-004',
            category='TECHNICAL',
            description='Comprehensive audit logging of PHI access',
            status=ComplianceStatus.COMPLIANT if compliant else ComplianceStatus.NON_COMPLIANT,
            evidence=f"Logging: {logging_enabled}, Retention: {retention_ok}, Tamper-proof: {tamper_proof}, Monitoring: {monitoring_active}",
            last_verified=datetime.now(),
            next_verification_due=datetime.now() + timedelta(days=7),
            responsible_party='Engineering Team'
        )
    
    def check_workforce_training_current(self) -> ComplianceCheck:
        """Verify all workforce members have current HIPAA training."""
        query = """
            SELECT
                COUNT(*) as total_employees,
                COUNT(CASE WHEN last_training_date > now() - INTERVAL '1 year' THEN 1 END) as trained
            FROM employees
            WHERE has_phi_access = TRUE
        """
        
        result = db.query(query)[0]
        total = result['total_employees']
        trained = result['trained']
        compliance_rate = (trained / total * 100) if total > 0 else 0
        
        # Require 100% compliance
        compliant = compliance_rate == 100
        
        return ComplianceCheck(
            check_id='ADMIN-002',
            category='ADMINISTRATIVE',
            description='Annual HIPAA training for all workforce members',
            status=ComplianceStatus.COMPLIANT if compliant else ComplianceStatus.NON_COMPLIANT,
            evidence=f"{trained}/{total} employees trained ({compliance_rate:.1f}%)",
            last_verified=datetime.now(),
            next_verification_due=datetime.now() + timedelta(days=30),
            responsible_party='HR & Compliance'
        )

HIPAA Compliance Checklist (Engineering Focus):

## Technical Safeguards Checklist

### Access Control (164.312(a)(1))
- [ ] Unique user identification (no shared accounts)
- [ ] Emergency access procedure (break-glass with audit)
- [ ] Automatic logoff (session timeout after inactivity)
- [ ] Encryption and decryption (PHI encrypted at rest and in transit)

### Audit Controls (164.312(b))
- [ ] Log all PHI access (who, what, when, where, why)
- [ ] Audit logs retained for 6 years minimum
- [ ] Audit logs tamper-proof (append-only)
- [ ] Regular audit log review (automated + manual)

### Integrity (164.312(c)(1))
- [ ] Detect unauthorized alterations to PHI
- [ ] Data integrity verification (checksums, digital signatures)
- [ ] Version control for clinical documents

### Transmission Security (164.312(e)(1))
- [ ] TLS 1.3 (or TLS 1.2 minimum) for all PHI transmission
- [ ] No unencrypted email of PHI
- [ ] VPN for remote access
- [ ] Encrypt all portable media (USB drives, laptops)

### Additional Technical Controls
- [ ] Multi-factor authentication (MFA) for remote access
- [ ] Role-based access control (RBAC) implemented
- [ ] Principle of least privilege enforced
- [ ] Regular vulnerability scanning
- [ ] Penetration testing annually
- [ ] Patch management process (apply security patches within 30 days)
- [ ] Antivirus/antimalware on all endpoints
- [ ] DLP (Data Loss Prevention) to prevent PHI exfiltration
- [ ] Network segmentation (separate PHI systems)
- [ ] IDS/IPS (Intrusion Detection/Prevention)

8. Interoperability Challenges

8.1 Data Mapping — HL7 to FHIR

class HL7toFHIRMapper:
    """
    Map HL7 v2 messages to FHIR resources.
    
    Challenges:
    - HL7 v2 is flexible (too flexible) — many local variations
    - Field meanings vary by implementation
    - No universal identifier system
    - Timezone handling
    """
    
    def map_adt_to_bundle(self, hl7_message: HL7Message) -> Dict:
        """
        Convert ADT^A01 (patient admission) to FHIR Bundle with:
        - Patient resource
        - Encounter resource
        - Condition resources (problems)
        """
        msh = hl7_message.get_segment('MSH')
        pid = hl7_message.get_segment('PID')
        pv1 = hl7_message.get_segment('PV1')
        
        bundle = {
            "resourceType": "Bundle",
            "type": "transaction",
            "entry": []
        }
        
        # Map Patient
        patient = self.map_patient(pid)
        bundle['entry'].append({
            "fullUrl": f"urn:uuid:{patient['id']}",
            "resource": patient,
            "request": {"method": "POST", "url": "Patient"}
        })
        
        # Map Encounter
        encounter = self.map_encounter(pv1, patient['id'])
        bundle['entry'].append({
            "fullUrl": f"urn:uuid:{encounter['id']}",
            "resource": encounter,
            "request": {"method": "POST", "url": "Encounter"}
        })
        
        return bundle
    
    def map_patient(self, pid: HL7Segment) -> Dict:
        """Map PID segment to FHIR Patient resource."""
        # PID-3: Patient Identifier List
        mrn = pid.get_component(3, 0)
        mrn_system = pid.get_component(3, 3) or "local"
        
        # PID-5: Patient Name
        family = pid.get_component(5, 0)
        given = pid.get_component(5, 1)
        
        # PID-7: Date of Birth (YYYYMMDD)
        dob_str = pid.get_field(7)
        dob = f"{dob_str[0:4]}-{dob_str[4:6]}-{dob_str[6:8]}" if dob_str else None
        
        # PID-8: Administrative Sex
        gender_map = {'M': 'male', 'F': 'female', 'O': 'other', 'U': 'unknown'}
        gender = gender_map.get(pid.get_field(8), 'unknown')
        
        return {
            "resourceType": "Patient",
            "id": str(uuid.uuid4()),
            "identifier": [{
                "system": f"urn:oid:{mrn_system}",
                "value": mrn,
                "type": {
                    "coding": [{
                        "system": "http://terminology.hl7.org/CodeSystem/v2-0203",
                        "code": "MR"
                    }]
                }
            }],
            "name": [{
                "use": "official",
                "family": family,
                "given": [given]
            }],
            "gender": gender,
            "birthDate": dob
        }
    
    def map_encounter(self, pv1: HL7Segment, patient_id: str) -> Dict:
        """Map PV1 segment to FHIR Encounter resource."""
        # PV1-2: Patient Class (I=Inpatient, O=Outpatient, E=Emergency)
        class_map = {
            'I': 'IMP',  # Inpatient encounter
            'O': 'AMB',  # Ambulatory
            'E': 'EMER', # Emergency
        }
        patient_class = class_map.get(pv1.get_field(2), 'AMB')
        
        # PV1-3: Assigned Patient Location
        location = pv1.get_field(3)
        room = pv1.get_component(3, 1)
        bed = pv1.get_component(3, 2)
        
        # PV1-7: Attending Doctor
        attending_id = pv1.get_component(7, 0)
        
        # PV1-44: Admit Date/Time
        admit_datetime = pv1.get_field(44)
        
        return {
            "resourceType": "Encounter",
            "id": str(uuid.uuid4()),
            "status": "in-progress",
            "class": {
                "system": "http://terminology.hl7.org/CodeSystem/v3-ActCode",
                "code": patient_class
            },
            "subject": {
                "reference": f"urn:uuid:{patient_id}"
            },
            "participant": [{
                "type": [{
                    "coding": [{
                        "system": "http://terminology.hl7.org/CodeSystem/v3-ParticipationType",
                        "code": "ATND",
                        "display": "attender"
                    }]
                }],
                "individual": {
                    "reference": f"Practitioner/{attending_id}"
                }
            }],
            "period": {
                "start": self.parse_hl7_datetime(admit_datetime)
            },
            "location": [{
                "location": {
                    "display": f"Room {room}, Bed {bed}"
                }
            }]
        }
    
    def parse_hl7_datetime(self, hl7_datetime: str) -> str:
        """
        Convert HL7 datetime format to ISO 8601.
        
        HL7: YYYYMMDDHHmmss[.SSSS][+/-ZZZZ]
        ISO: YYYY-MM-DDTHH:mm:ss±HH:MM
        """
        if not hl7_datetime:
            return None
        
        # Parse components
        year = hl7_datetime[0:4]
        month = hl7_datetime[4:6]
        day = hl7_datetime[6:8]
        hour = hl7_datetime[8:10] if len(hl7_datetime) > 8 else "00"
        minute = hl7_datetime[10:12] if len(hl7_datetime) > 10 else "00"
        second = hl7_datetime[12:14] if len(hl7_datetime) > 12 else "00"
        
        # TODO: Parse timezone offset
        
        return f"{year}-{month}-{day}T{hour}:{minute}:{second}Z"

8.2 Vendor Integration — Epic, Cerner APIs

import requests
from typing import Dict, Optional

class EpicFHIRClient:
    """
    Epic FHIR API client (Epic is the largest EHR vendor in US).
    
    Epic uses SMART on FHIR for OAuth 2.0 authentication.
    """
    
    def __init__(self, base_url: str, client_id: str, client_secret: str):
        self.base_url = base_url
        self.client_id = client_id
        self.client_secret = client_secret
        self.access_token = None
    
    def get_authorization_url(self, redirect_uri: str, state: str) -> str:
        """
        Step 1: Get authorization URL for OAuth 2.0 flow.
        User will be redirected to Epic login page.
        """
        # Discover OAuth endpoints from .well-known/smart-configuration
        smart_config = requests.get(f"{self.base_url}/.well-known/smart-configuration").json()
        
        authorize_url = smart_config['authorization_endpoint']
        
        params = {
            'response_type': 'code',
            'client_id': self.client_id,
            'redirect_uri': redirect_uri,
            'scope': 'patient/Patient.read patient/Observation.read launch/patient',
            'state': state,
            'aud': self.base_url
        }
        
        return f"{authorize_url}?{'&'.join([f'{k}={v}' for k, v in params.items()])}"
    
    def exchange_code_for_token(self, code: str, redirect_uri: str) -> Dict:
        """
        Step 2: Exchange authorization code for access token.
        """
        smart_config = requests.get(f"{self.base_url}/.well-known/smart-configuration").json()
        token_url = smart_config['token_endpoint']
        
        data = {
            'grant_type': 'authorization_code',
            'code': code,
            'redirect_uri': redirect_uri,
            'client_id': self.client_id,
            'client_secret': self.client_secret
        }
        
        response = requests.post(token_url, data=data)
        response.raise_for_status()
        
        token_response = response.json()
        self.access_token = token_response['access_token']
        
        return token_response
    
    def get_patient(self, patient_id: str) -> Dict:
        """Get Patient resource from Epic."""
        headers = {
            'Authorization': f'Bearer {self.access_token}',
            'Accept': 'application/fhir+json'
        }
        
        response = requests.get(
            f"{self.base_url}/api/FHIR/R4/Patient/{patient_id}",
            headers=headers
        )
        response.raise_for_status()
        
        return response.json()


class CernerFHIRClient:
    """
    Cerner FHIR API client (second largest EHR vendor).
    
    Cerner also uses SMART on FHIR, similar to Epic.
    """
    
    # Implementation similar to EpicFHIRClient
    pass

9. Interview Questions — Design and Trade-offs

Q1: Design a HIPAA-compliant EHR system

Requirements:

  • Support 1000+ concurrent users
  • Store patient demographics, encounters, medications, lab results
  • Comply with HIPAA Security Rule
  • 99.9% uptime SLA
  • Sub-200ms API response time

Solution Outline:

Architecture:

┌─────────────────────────────────────────────────────────────┐
│   Load Balancer (AWS ALB) + WAF                             │
│   - TLS termination                                         │
│   - DDoS protection                                         │
└──────────────┬──────────────────────────────────────────────┘
               │
    ┌──────────┴──────────┐
    │                     │
┌───▼────┐          ┌─────▼────┐
│API GW 1│          │ API GW 2 │  (Auto-scaling)
└───┬────┘          └─────┬────┘
    │                     │
    └──────────┬──────────┘
               │
    ┌──────────▼──────────────────────────────────┐
    │   Application Servers (Kubernetes)          │
    │   - PHI encryption/decryption               │
    │   - RBAC enforcement                        │
    │   - Audit logging                           │
    └──────────┬──────────────┬───────────────────┘
               │              │
       ┌───────▼─────┐   ┌────▼──────────┐
       │   Primary   │   │   Read        │
       │   DB (RDS)  │◄──┤   Replicas    │
       │   (PHI)     │   │               │
       └─────────────┘   └───────────────┘
               │
       ┌───────▼──────────┐
       │  Audit DB        │
       │  (PostgreSQL)    │
       │  (append-only)   │
       └──────────────────┘

Key Design Decisions:

  1. Database Encryption: TDE (Transparent Data Encryption) + application-layer encryption
  2. Access Control: JWT tokens with short expiry (15 min), RBAC with attribute-based conditions
  3. Audit Logging: Asynchronous logging to separate database, streamed to SIEM
  4. High Availability: Multi-AZ deployment, automated failover, regular DR drills
  5. Performance: Redis cache for read-heavy data (metadata, code tables), connection pooling
  6. Compliance: Automated compliance checks, regular penetration testing, SOC 2 certification

Trade-offs:

  • Security vs Performance: Encryption adds latency (~5-10ms), accept for compliance
  • Consistency vs Availability: Use strong consistency for writes, eventual consistency for reads from replicas
  • Cost vs Compliance: Expensive HSM for key management, but necessary for HIPAA

Q2: How would you handle a patient merging two duplicate accounts?

Challenge: Detect that Patient A and Patient B are actually the same person, merge all PHI while maintaining audit trail.

Solution:

def merge_patients(keep_patient_id: str, merge_patient_id: str, operator_id: str) -> bool:
    """
    Merge duplicate patient records while maintaining complete audit trail.
    
    Steps:
    1. Verify operator has authority to merge
    2. Lock both patient records
    3. Merge data from merge_patient → keep_patient
    4. Mark merge_patient as merged (DON'T delete — maintains referential integrity)
    5. Create merge audit record
    6. Notify all systems of merge
    """
    
    # 1. Authorization check
    if not has_permission(operator_id, 'PATIENT_MERGE'):
        raise PermissionError("User not authorized to merge patients")
    
    # 2. Begin transaction
    with db.transaction():
        # Lock both patients
        keep_patient = db.query("SELECT * FROM patients WHERE patient_id = %s FOR UPDATE", [keep_patient_id])
        merge_patient = db.query("SELECT * FROM patients WHERE patient_id = %s FOR UPDATE", [merge_patient_id])
        
        # 3. Merge demographics (prefer most complete record)
        merged_demographics = merge_demographics_fields(keep_patient, merge_patient)
        db.execute("UPDATE patients SET ... WHERE patient_id = %s", [keep_patient_id])
        
        # 4. Reassign all linked records
        tables = [
            'encounters', 'observations', 'medication_orders',
            'diagnoses', 'procedures', 'allergies', 'lab_results'
        ]
        
        for table in tables:
            db.execute(f"""
                UPDATE {table}
                SET patient_id = %s,
                    updated_at = now(),
                    updated_by = %s,
                    update_reason = 'PATIENT_MERGE'
                WHERE patient_id = %s
            """, [keep_patient_id, operator_id, merge_patient_id])
        
        # 5. Mark merge_patient as merged (DON'T delete)
        db.execute("""
            UPDATE patients
            SET is_merged = TRUE,
                merged_into_patient_id = %s,
                merged_at = now(),
                merged_by = %s
            WHERE patient_id = %s
        """, [keep_patient_id, operator_id, merge_patient_id])
        
        # 6. Create merge audit record
        db.execute("""
            INSERT INTO patient_merge_history
            (keep_patient_id, merge_patient_id, merged_by, merged_at, reason)
            VALUES (%s, %s, %s, now(), %s)
        """, [keep_patient_id, merge_patient_id, operator_id, "Duplicate record"])
        
        # 7. Publish event to all integrated systems
        event_bus.publish('PatientMerged', {
            'keep_patient_id': keep_patient_id,
            'merge_patient_id': merge_patient_id
        })
    
    return True

Q3: Design data retention policy balancing HIPAA requirements with GDPR "right to be forgotten"

Conflict:

  • HIPAA: Retain medical records for 6 years minimum (some states require longer)
  • GDPR: Patients have "right to erasure" of personal data

Solution:

class DataRetentionPolicy:
    """
    Implement compliant data retention balancing HIPAA and GDPR.
    
    Strategy:
    1. HIPAA takes precedence for active medical care
    2. After retention period expires, support GDPR erasure
    3. De-identify rather than delete (preserves research value)
    4. Maintain minimal audit trail even after erasure
    """
    
    def handle_erasure_request(self, patient_id: str, request_date: datetime) -> str:
        """
        Process GDPR erasure request.
        
        Returns: Status ('IMMEDIATE', 'DEFERRED', 'PARTIAL')
        """
        # 1. Check if retention period has expired
        last_encounter = db.query("""
            SELECT MAX(discharge_time) as last_encounter
            FROM encounters
            WHERE patient_id = %s
        """, [patient_id])[0]['last_encounter']
        
        retention_expiry = last_encounter + timedelta(days=6*365)  # 6 years
        
        if datetime.now() < retention_expiry:
            # Still within retention period — DEFER erasure
            return self.defer_erasure(patient_id, retention_expiry)
        
        # 2. Retention period expired — proceed with de-identification
        return self.de_identify_patient(patient_id)
    
    def defer_erasure(self, patient_id: str, defer_until: datetime) -> str:
        """
        Log erasure request but defer until retention period expires.
        """
        db.execute("""
            INSERT INTO pending_erasure_requests
            (patient_id, requested_at, deferred_until, reason)
            VALUES (%s, now(), %s, 'HIPAA_RETENTION')
        """, [patient_id, defer_until])
        
        # Notify patient
        send_notification(patient_id, f"""
            Your data erasure request has been received.
            Per HIPAA requirements, medical records must be retained until {defer_until.strftime('%Y-%m-%d')}.
            Your data will be automatically erased after this date unless required by law.
        """)
        
        return 'DEFERRED'
    
    def de_identify_patient(self, patient_id: str) -> str:
        """
        De-identify patient data (Safe Harbor method).
        
        This satisfies both HIPAA (data retained) and GDPR (not identifiable).
        """
        with db.transaction():
            # Replace identifiers with anonymous tokens
            anonymous_id = str(uuid.uuid4())
            
            # De-identify patient record
            db.execute("""
                UPDATE patients
                SET
                    encrypted_first_name = %s,
                    encrypted_last_name = %s,
                    date_of_birth = date_trunc('year', date_of_birth),  -- Keep year only
                    encrypted_ssn = NULL,
                    encrypted_address = NULL,
                    encrypted_phone = NULL,
                    is_de_identified = TRUE,
                    de_identified_at = now(),
                    original_patient_id = %s
                WHERE patient_id = %s
            """, [b'[REDACTED]', b'[REDACTED]', patient_id, patient_id])
            
            # De-identify audit logs (remove IP, user agent)
            db.execute("""
                UPDATE phi_access_log
                SET
                    ip_address = '0.0.0.0',
                    user_agent = '[REDACTED]'
                WHERE patient_id = %s
            """, [patient_id])
            
            # Log the de-identification
            db.execute("""
                INSERT INTO de_identification_log
                (patient_id, de_identified_at, method)
                VALUES (%s, now(), 'SAFE_HARBOR')
            """, [patient_id])
        
        return 'COMPLETED'

Tóm tắt

Healthcare IT là domain đặc biệt đòi hỏi sự kết hợp giữa:

  • Kiến thức kỹ thuật sâu: Encryption, access control, audit logging
  • Tuân thủ pháp lý nghiêm ngặt: HIPAA có thể phạt $50K/vi phạm
  • Độ tin cậy cao: Life-critical systems không được phép fail
  • Interoperability: Tích hợp với legacy systems (HL7 v2) và modern APIs (FHIR)

Key Takeaways cho Senior Engineers:

  1. Security First: Mọi quyết định thiết kế phải xem xét impact lên security và compliance
  2. Audit Everything: Mọi truy cập PHI phải được log đầy đủ và tamper-proof
  3. Encrypt Everywhere: At rest, in transit, in memory nếu có thể
  4. Defense in Depth: Không tin vào một layer security duy nhất
  5. Prepare for Breaches: Incident response plan và breach notification process
  6. Understand Regulations: HIPAA không chỉ là "checkbox" — phải hiểu spirit of the law
  7. Balance Trade-offs: Security vs usability, compliance cost vs risk, performance vs audit overhead

Healthcare IT không dành cho faint of heart — một bug có thể gây nguy hiểm tính mạng, một lỗ hổng security có thể dẫn đến breach với mức phạt hàng triệu đô. Nhưng đó cũng là domain có impact thực sự lên cuộc sống con người, và là cơ hội để senior engineers thể hiện khả năng thiết kế hệ thống phức tạp, secure, và compliant.


Tài liệu tham khảo

Standards & Specifications:

Regulations:

Books:

  • "HL7 for BizTalk" by Howard Edidin (good intro to HL7)
  • "Learning FHIR" by Thomas Beale (comprehensive FHIR guide)
  • "Information Security in Healthcare" by Rainu Kaushal

Tools & Platforms: