Emergency Hotline: Call 1-844-363-1423 (United We Dream Hotline)
ICE Encounter

Alert System Architecture: Streaming and Notifications

Transitioning from passive monitoring to active alerting requires robust event-driven architecture capable of processing telemetry and delivering verified alerts to rapid response networks.


System Architecture Overview

Component Layers

┌─────────────────────────────────────────────────────────────┐
│                    ALERT DISTRIBUTION                        │
│         Signal Groups / Telegram Channels / Webhooks         │
└─────────────────────────────────────────────────────────────┘
                              ↑
┌─────────────────────────────────────────────────────────────┐
│                    VERIFICATION LAYER                        │
│    Geofence Check / Descent Profile / Secondary Intel        │
└─────────────────────────────────────────────────────────────┘
                              ↑
┌─────────────────────────────────────────────────────────────┐
│                    FILTER WORKERS                            │
│         Watchlist Match / Call Sign Filter / LADD/PIA        │
└─────────────────────────────────────────────────────────────┘
                              ↑
┌─────────────────────────────────────────────────────────────┐
│                    MESSAGE BROKER                            │
│              Apache Kafka / Redis Pub/Sub                    │
└─────────────────────────────────────────────────────────────┘
                              ↑
┌─────────────────────────────────────────────────────────────┐
│                    INGESTION DAEMONS                         │
│    API Polling / Local Receiver JSON / WebSocket Feeds       │
└─────────────────────────────────────────────────────────────┘

Data Flow

Stage Component Function
Ingest Polling daemon Fetch aircraft data from sources
Queue Message broker Buffer and distribute messages
Filter Worker process Match against watchlist
Verify Geospatial engine Check position against geofences
Alert Notification service Deliver to encrypted channels

Ingestion Layer

API Polling

import requests
import time
import json
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

API_URL = "https://adsbexchange.com/api/aircraft/v2/all"
API_KEY = "your_api_key"

def poll_aircraft():
    """Poll API and publish to Kafka"""
    headers = {"api-auth": API_KEY}

    while True:
        try:
            response = requests.get(API_URL, headers=headers, timeout=30)
            data = response.json()

            for aircraft in data.get("ac", []):
                producer.send('aircraft_positions', aircraft)

            producer.flush()

        except Exception as e:
            print(f"Poll error: {e}")

        time.sleep(5)

Local Receiver Integration

import requests
import json
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

LOCAL_URL = "http://localhost/tar1090/data/aircraft.json"

def poll_local_receiver():
    """Poll local tar1090 and publish to Kafka"""

    while True:
        try:
            response = requests.get(LOCAL_URL, timeout=10)
            data = response.json()

            for aircraft in data.get("aircraft", []):
                # Add timestamp and source metadata
                aircraft["_source"] = "local_receiver"
                aircraft["_timestamp"] = data.get("now")
                producer.send('aircraft_positions', aircraft)

            producer.flush()

        except Exception as e:
            print(f"Local poll error: {e}")

        time.sleep(2)

Message Broker Configuration

Apache Kafka Setup

# docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - "9092:9092"

Topic Configuration

Topic Purpose Partitions
aircraft_positions Raw position data 3
watchlist_matches Filtered matches 1
verified_alerts Confirmed alerts 1

Redis Alternative

import redis
import json

r = redis.Redis(host='localhost', port=6379, db=0)

def publish_position(aircraft):
    """Publish aircraft position to Redis channel"""
    r.publish('aircraft_positions', json.dumps(aircraft))

def subscribe_positions():
    """Subscribe to aircraft positions"""
    pubsub = r.pubsub()
    pubsub.subscribe('aircraft_positions')

    for message in pubsub.listen():
        if message['type'] == 'message':
            aircraft = json.loads(message['data'])
            process_aircraft(aircraft)

Filter Layer

Watchlist Matching

from kafka import KafkaConsumer, KafkaProducer
import json

# Load watchlist
WATCHLIST = set()
with open('watchlist.json', 'r') as f:
    watchlist_data = json.load(f)
    WATCHLIST = {ac['hex'].upper() for ac in watchlist_data}

consumer = KafkaConsumer(
    'aircraft_positions',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def filter_watchlist():
    """Filter stream for watchlist aircraft"""

    for message in consumer:
        aircraft = message.value
        hex_code = aircraft.get('hex', '').upper()

        if hex_code in WATCHLIST:
            producer.send('watchlist_matches', aircraft)
            producer.flush()

Call Sign Filtering

import re

RPN_PATTERN = re.compile(r'^RPN\d+', re.IGNORECASE)

def is_rpn_callsign(callsign):
    """Check if call sign matches RPN pattern"""
    if not callsign:
        return False
    return bool(RPN_PATTERN.match(callsign.strip()))

def filter_callsigns():
    """Filter for RPN call signs"""

    for message in consumer:
        aircraft = message.value
        callsign = aircraft.get('flight', '')

        if is_rpn_callsign(callsign):
            aircraft['_rpn_match'] = True
            producer.send('watchlist_matches', aircraft)

LADD/PIA Detection

def check_privacy_flags(aircraft):
    """Check for privacy program enrollment"""
    db_flags = aircraft.get('dbFlags', 0)

    return {
        'is_pia': bool(db_flags & 4),
        'is_ladd': bool(db_flags & 8),
        'is_military': bool(db_flags & 1)
    }

Geospatial Verification

Geofence Definition

GEOFENCES = [
    {
        "name": "Alexandria Staging",
        "lat": 31.3274,
        "lon": -92.5499,
        "radius_miles": 30,
        "type": "staging_hub"
    },
    {
        "name": "Mesa Gateway",
        "lat": 33.3078,
        "lon": -111.6556,
        "radius_miles": 30,
        "type": "staging_hub"
    },
    {
        "name": "Brownsville",
        "lat": 25.9069,
        "lon": -97.4258,
        "radius_miles": 30,
        "type": "staging_hub"
    }
]

Haversine Geofencing

from math import radians, sin, cos, sqrt, atan2

def haversine(lat1, lon1, lat2, lon2):
    """Calculate great-circle distance in miles"""
    R = 3959

    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1

    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * atan2(sqrt(a), sqrt(1-a))

    return R * c

def check_geofences(aircraft):
    """Check if aircraft is within any geofence"""
    lat = aircraft.get('lat')
    lon = aircraft.get('lon')

    if lat is None or lon is None:
        return None

    for fence in GEOFENCES:
        distance = haversine(lat, lon, fence['lat'], fence['lon'])
        if distance <= fence['radius_miles']:
            return {
                'geofence': fence['name'],
                'distance_miles': round(distance, 1),
                'type': fence['type']
            }

    return None

Descent Profile Analysis

def analyze_descent_profile(aircraft, history):
    """Determine if aircraft is landing"""

    current_alt = aircraft.get('alt_baro', 40000)
    baro_rate = aircraft.get('baro_rate', 0)

    # Check for descent
    if baro_rate >= 0:
        return {'landing': False, 'phase': 'level_or_climbing'}

    # Check altitude threshold
    if current_alt > 10000:
        return {'landing': False, 'phase': 'high_altitude_descent'}

    # Check for sustained descent
    recent_positions = history[-5:]
    altitudes = [p.get('alt_baro', 0) for p in recent_positions]

    if all(altitudes[i] >= altitudes[i+1] for i in range(len(altitudes)-1)):
        return {
            'landing': True,
            'phase': 'final_approach',
            'current_altitude': current_alt,
            'descent_rate': baro_rate
        }

    return {'landing': False, 'phase': 'variable_descent'}

False Positive Mitigation

Multi-Condition Verification

Before generating an alert, the system must verify multiple conditions:

Condition Threshold Purpose
Watchlist match Hex code in list Basic identification
Geofence penetration Within radius Geographic relevance
Altitude <10,000 ft Landing probability
Descent rate Negative baro_rate Active approach
Sustained descent 3+ consecutive readings Not a temporary dip

Verification Logic

def should_alert(aircraft, history, geofence_result):
    """Multi-condition verification before alerting"""

    # Condition 1: Must be in watchlist (already filtered)

    # Condition 2: Must be within geofence
    if geofence_result is None:
        return False, "Not within any geofence"

    # Condition 3: Low altitude
    altitude = aircraft.get('alt_baro', 40000)
    if altitude > 10000:
        return False, f"Altitude too high: {altitude} ft"

    # Condition 4: Descending
    baro_rate = aircraft.get('baro_rate', 0)
    if baro_rate >= 0:
        return False, "Not descending"

    # Condition 5: Sustained descent pattern
    if len(history) >= 3:
        recent_alts = [h.get('alt_baro', 0) for h in history[-3:]]
        if not all(recent_alts[i] >= recent_alts[i+1]
                   for i in range(len(recent_alts)-1)):
            return False, "Descent not sustained"

    return True, "All conditions met"

Alert Distribution

Signal Integration

Using signal-cli for encrypted notifications:

# Install signal-cli
wget https://github.com/AsamK/signal-cli/releases/download/v0.11.0/signal-cli-0.11.0.tar.gz
tar xf signal-cli-0.11.0.tar.gz
import subprocess
import json

SIGNAL_CLI = "/path/to/signal-cli"
SENDER_NUMBER = "+1234567890"
GROUP_ID = "base64_encoded_group_id"

def send_signal_alert(message):
    """Send alert to Signal group"""

    cmd = [
        SIGNAL_CLI,
        "-u", SENDER_NUMBER,
        "send",
        "-g", GROUP_ID,
        "-m", message
    ]

    try:
        subprocess.run(cmd, check=True, timeout=30)
        return True
    except subprocess.CalledProcessError as e:
        print(f"Signal send failed: {e}")
        return False

Telegram Integration

import requests

TELEGRAM_TOKEN = "your_bot_token"
TELEGRAM_CHAT_ID = "-100xxxxxxxxx"  # Group ID

def send_telegram_alert(message, tracking_url=None):
    """Send alert to Telegram channel"""

    url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"

    # Format message with tracking link
    if tracking_url:
        message += f"\n\n🔗 Track: {tracking_url}"

    payload = {
        "chat_id": TELEGRAM_CHAT_ID,
        "text": message,
        "parse_mode": "Markdown",
        "disable_web_page_preview": False
    }

    response = requests.post(url, json=payload)
    return response.status_code == 200

Alert Message Format

def format_alert(aircraft, geofence_result):
    """Format alert message"""

    hex_code = aircraft.get('hex', 'UNKNOWN').upper()
    callsign = aircraft.get('flight', 'N/A').strip()
    altitude = aircraft.get('alt_baro', 'N/A')
    speed = aircraft.get('gs', 'N/A')

    tracking_url = f"https://globe.adsbexchange.com/?icao={hex_code.lower()}"

    message = f"""
⚠️ FLIGHT ALERT

Aircraft: {hex_code}
Call Sign: {callsign}
Altitude: {altitude} ft
Speed: {speed} kts
Location: {geofence_result['geofence']}
Distance: {geofence_result['distance_miles']} mi

Track: {tracking_url}
"""

    return message.strip()

Webhook Integration

Generic Webhook

import requests
import json

WEBHOOK_URLS = [
    "https://hooks.example.com/alert",
    "https://dispatch.rapidresponse.org/webhook"
]

def send_webhook_alert(aircraft, geofence_result, verified_at):
    """Send alert via webhooks"""

    payload = {
        "type": "flight_alert",
        "timestamp": verified_at.isoformat(),
        "aircraft": {
            "hex": aircraft.get('hex'),
            "callsign": aircraft.get('flight'),
            "altitude": aircraft.get('alt_baro'),
            "position": {
                "lat": aircraft.get('lat'),
                "lon": aircraft.get('lon')
            }
        },
        "geofence": geofence_result,
        "tracking_url": f"https://globe.adsbexchange.com/?icao={aircraft.get('hex')}"
    }

    for url in WEBHOOK_URLS:
        try:
            response = requests.post(
                url,
                json=payload,
                timeout=10,
                headers={"Content-Type": "application/json"}
            )
            print(f"Webhook {url}: {response.status_code}")
        except Exception as e:
            print(f"Webhook {url} failed: {e}")

Monitoring and Reliability

Health Checks

import time
from datetime import datetime, timedelta

last_message_time = None
STALE_THRESHOLD = timedelta(minutes=5)

def health_check():
    """Check system health"""

    issues = []

    # Check message freshness
    if last_message_time:
        age = datetime.utcnow() - last_message_time
        if age > STALE_THRESHOLD:
            issues.append(f"No messages for {age.seconds // 60} minutes")

    # Check Kafka connection
    try:
        producer.send('health_check', {'timestamp': time.time()})
        producer.flush(timeout=5)
    except Exception as e:
        issues.append(f"Kafka error: {e}")

    return {
        'healthy': len(issues) == 0,
        'issues': issues,
        'checked_at': datetime.utcnow().isoformat()
    }

Alerting on System Failures

def alert_system_failure(component, error):
    """Alert operators to system failures"""

    message = f"""
🚨 SYSTEM ALERT

Component: {component}
Error: {error}
Time: {datetime.utcnow().isoformat()}

Manual intervention may be required.
"""

    send_signal_alert(message)

Related Resources


Last updated: March 25, 2026

Legal Disclaimer

This website does not provide legal advice. The information provided on this site is for general informational and educational purposes only. It does not create an attorney-client relationship.

Information on this website may not be current or accurate. Immigration law is complex and varies by jurisdiction and individual circumstances. Always consult with a qualified immigration attorney for advice specific to your situation.

Neither ICE Encounter, its developers, partners, nor any contributors shall be liable for any actions taken or not taken based on information from this site. Use of this site is subject to our Terms of Use and Privacy Policy.