Alert System Architecture: Streaming and Notifications
Transitioning from passive monitoring to active alerting requires robust event-driven architecture capable of processing telemetry and delivering verified alerts to rapid response networks.
System Architecture Overview
Component Layers
┌─────────────────────────────────────────────────────────────┐
│ ALERT DISTRIBUTION │
│ Signal Groups / Telegram Channels / Webhooks │
└─────────────────────────────────────────────────────────────┘
↑
┌─────────────────────────────────────────────────────────────┐
│ VERIFICATION LAYER │
│ Geofence Check / Descent Profile / Secondary Intel │
└─────────────────────────────────────────────────────────────┘
↑
┌─────────────────────────────────────────────────────────────┐
│ FILTER WORKERS │
│ Watchlist Match / Call Sign Filter / LADD/PIA │
└─────────────────────────────────────────────────────────────┘
↑
┌─────────────────────────────────────────────────────────────┐
│ MESSAGE BROKER │
│ Apache Kafka / Redis Pub/Sub │
└─────────────────────────────────────────────────────────────┘
↑
┌─────────────────────────────────────────────────────────────┐
│ INGESTION DAEMONS │
│ API Polling / Local Receiver JSON / WebSocket Feeds │
└─────────────────────────────────────────────────────────────┘
Data Flow
| Stage | Component | Function |
|---|---|---|
| Ingest | Polling daemon | Fetch aircraft data from sources |
| Queue | Message broker | Buffer and distribute messages |
| Filter | Worker process | Match against watchlist |
| Verify | Geospatial engine | Check position against geofences |
| Alert | Notification service | Deliver to encrypted channels |
Ingestion Layer
API Polling
import requests
import time
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
API_URL = "https://adsbexchange.com/api/aircraft/v2/all"
API_KEY = "your_api_key"
def poll_aircraft():
"""Poll API and publish to Kafka"""
headers = {"api-auth": API_KEY}
while True:
try:
response = requests.get(API_URL, headers=headers, timeout=30)
data = response.json()
for aircraft in data.get("ac", []):
producer.send('aircraft_positions', aircraft)
producer.flush()
except Exception as e:
print(f"Poll error: {e}")
time.sleep(5)
Local Receiver Integration
import requests
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
LOCAL_URL = "http://localhost/tar1090/data/aircraft.json"
def poll_local_receiver():
"""Poll local tar1090 and publish to Kafka"""
while True:
try:
response = requests.get(LOCAL_URL, timeout=10)
data = response.json()
for aircraft in data.get("aircraft", []):
# Add timestamp and source metadata
aircraft["_source"] = "local_receiver"
aircraft["_timestamp"] = data.get("now")
producer.send('aircraft_positions', aircraft)
producer.flush()
except Exception as e:
print(f"Local poll error: {e}")
time.sleep(2)
Message Broker Configuration
Apache Kafka Setup
# docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- "9092:9092"
Topic Configuration
| Topic | Purpose | Partitions |
|---|---|---|
aircraft_positions |
Raw position data | 3 |
watchlist_matches |
Filtered matches | 1 |
verified_alerts |
Confirmed alerts | 1 |
Redis Alternative
import redis
import json
r = redis.Redis(host='localhost', port=6379, db=0)
def publish_position(aircraft):
"""Publish aircraft position to Redis channel"""
r.publish('aircraft_positions', json.dumps(aircraft))
def subscribe_positions():
"""Subscribe to aircraft positions"""
pubsub = r.pubsub()
pubsub.subscribe('aircraft_positions')
for message in pubsub.listen():
if message['type'] == 'message':
aircraft = json.loads(message['data'])
process_aircraft(aircraft)
Filter Layer
Watchlist Matching
from kafka import KafkaConsumer, KafkaProducer
import json
# Load watchlist
WATCHLIST = set()
with open('watchlist.json', 'r') as f:
watchlist_data = json.load(f)
WATCHLIST = {ac['hex'].upper() for ac in watchlist_data}
consumer = KafkaConsumer(
'aircraft_positions',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def filter_watchlist():
"""Filter stream for watchlist aircraft"""
for message in consumer:
aircraft = message.value
hex_code = aircraft.get('hex', '').upper()
if hex_code in WATCHLIST:
producer.send('watchlist_matches', aircraft)
producer.flush()
Call Sign Filtering
import re
RPN_PATTERN = re.compile(r'^RPN\d+', re.IGNORECASE)
def is_rpn_callsign(callsign):
"""Check if call sign matches RPN pattern"""
if not callsign:
return False
return bool(RPN_PATTERN.match(callsign.strip()))
def filter_callsigns():
"""Filter for RPN call signs"""
for message in consumer:
aircraft = message.value
callsign = aircraft.get('flight', '')
if is_rpn_callsign(callsign):
aircraft['_rpn_match'] = True
producer.send('watchlist_matches', aircraft)
LADD/PIA Detection
def check_privacy_flags(aircraft):
"""Check for privacy program enrollment"""
db_flags = aircraft.get('dbFlags', 0)
return {
'is_pia': bool(db_flags & 4),
'is_ladd': bool(db_flags & 8),
'is_military': bool(db_flags & 1)
}
Geospatial Verification
Geofence Definition
GEOFENCES = [
{
"name": "Alexandria Staging",
"lat": 31.3274,
"lon": -92.5499,
"radius_miles": 30,
"type": "staging_hub"
},
{
"name": "Mesa Gateway",
"lat": 33.3078,
"lon": -111.6556,
"radius_miles": 30,
"type": "staging_hub"
},
{
"name": "Brownsville",
"lat": 25.9069,
"lon": -97.4258,
"radius_miles": 30,
"type": "staging_hub"
}
]
Haversine Geofencing
from math import radians, sin, cos, sqrt, atan2
def haversine(lat1, lon1, lat2, lon2):
"""Calculate great-circle distance in miles"""
R = 3959
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
return R * c
def check_geofences(aircraft):
"""Check if aircraft is within any geofence"""
lat = aircraft.get('lat')
lon = aircraft.get('lon')
if lat is None or lon is None:
return None
for fence in GEOFENCES:
distance = haversine(lat, lon, fence['lat'], fence['lon'])
if distance <= fence['radius_miles']:
return {
'geofence': fence['name'],
'distance_miles': round(distance, 1),
'type': fence['type']
}
return None
Descent Profile Analysis
def analyze_descent_profile(aircraft, history):
"""Determine if aircraft is landing"""
current_alt = aircraft.get('alt_baro', 40000)
baro_rate = aircraft.get('baro_rate', 0)
# Check for descent
if baro_rate >= 0:
return {'landing': False, 'phase': 'level_or_climbing'}
# Check altitude threshold
if current_alt > 10000:
return {'landing': False, 'phase': 'high_altitude_descent'}
# Check for sustained descent
recent_positions = history[-5:]
altitudes = [p.get('alt_baro', 0) for p in recent_positions]
if all(altitudes[i] >= altitudes[i+1] for i in range(len(altitudes)-1)):
return {
'landing': True,
'phase': 'final_approach',
'current_altitude': current_alt,
'descent_rate': baro_rate
}
return {'landing': False, 'phase': 'variable_descent'}
False Positive Mitigation
Multi-Condition Verification
Before generating an alert, the system must verify multiple conditions:
| Condition | Threshold | Purpose |
|---|---|---|
| Watchlist match | Hex code in list | Basic identification |
| Geofence penetration | Within radius | Geographic relevance |
| Altitude | <10,000 ft | Landing probability |
| Descent rate | Negative baro_rate | Active approach |
| Sustained descent | 3+ consecutive readings | Not a temporary dip |
Verification Logic
def should_alert(aircraft, history, geofence_result):
"""Multi-condition verification before alerting"""
# Condition 1: Must be in watchlist (already filtered)
# Condition 2: Must be within geofence
if geofence_result is None:
return False, "Not within any geofence"
# Condition 3: Low altitude
altitude = aircraft.get('alt_baro', 40000)
if altitude > 10000:
return False, f"Altitude too high: {altitude} ft"
# Condition 4: Descending
baro_rate = aircraft.get('baro_rate', 0)
if baro_rate >= 0:
return False, "Not descending"
# Condition 5: Sustained descent pattern
if len(history) >= 3:
recent_alts = [h.get('alt_baro', 0) for h in history[-3:]]
if not all(recent_alts[i] >= recent_alts[i+1]
for i in range(len(recent_alts)-1)):
return False, "Descent not sustained"
return True, "All conditions met"
Alert Distribution
Signal Integration
Using signal-cli for encrypted notifications:
# Install signal-cli
wget https://github.com/AsamK/signal-cli/releases/download/v0.11.0/signal-cli-0.11.0.tar.gz
tar xf signal-cli-0.11.0.tar.gz
import subprocess
import json
SIGNAL_CLI = "/path/to/signal-cli"
SENDER_NUMBER = "+1234567890"
GROUP_ID = "base64_encoded_group_id"
def send_signal_alert(message):
"""Send alert to Signal group"""
cmd = [
SIGNAL_CLI,
"-u", SENDER_NUMBER,
"send",
"-g", GROUP_ID,
"-m", message
]
try:
subprocess.run(cmd, check=True, timeout=30)
return True
except subprocess.CalledProcessError as e:
print(f"Signal send failed: {e}")
return False
Telegram Integration
import requests
TELEGRAM_TOKEN = "your_bot_token"
TELEGRAM_CHAT_ID = "-100xxxxxxxxx" # Group ID
def send_telegram_alert(message, tracking_url=None):
"""Send alert to Telegram channel"""
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
# Format message with tracking link
if tracking_url:
message += f"\n\n🔗 Track: {tracking_url}"
payload = {
"chat_id": TELEGRAM_CHAT_ID,
"text": message,
"parse_mode": "Markdown",
"disable_web_page_preview": False
}
response = requests.post(url, json=payload)
return response.status_code == 200
Alert Message Format
def format_alert(aircraft, geofence_result):
"""Format alert message"""
hex_code = aircraft.get('hex', 'UNKNOWN').upper()
callsign = aircraft.get('flight', 'N/A').strip()
altitude = aircraft.get('alt_baro', 'N/A')
speed = aircraft.get('gs', 'N/A')
tracking_url = f"https://globe.adsbexchange.com/?icao={hex_code.lower()}"
message = f"""
⚠️ FLIGHT ALERT
Aircraft: {hex_code}
Call Sign: {callsign}
Altitude: {altitude} ft
Speed: {speed} kts
Location: {geofence_result['geofence']}
Distance: {geofence_result['distance_miles']} mi
Track: {tracking_url}
"""
return message.strip()
Webhook Integration
Generic Webhook
import requests
import json
WEBHOOK_URLS = [
"https://hooks.example.com/alert",
"https://dispatch.rapidresponse.org/webhook"
]
def send_webhook_alert(aircraft, geofence_result, verified_at):
"""Send alert via webhooks"""
payload = {
"type": "flight_alert",
"timestamp": verified_at.isoformat(),
"aircraft": {
"hex": aircraft.get('hex'),
"callsign": aircraft.get('flight'),
"altitude": aircraft.get('alt_baro'),
"position": {
"lat": aircraft.get('lat'),
"lon": aircraft.get('lon')
}
},
"geofence": geofence_result,
"tracking_url": f"https://globe.adsbexchange.com/?icao={aircraft.get('hex')}"
}
for url in WEBHOOK_URLS:
try:
response = requests.post(
url,
json=payload,
timeout=10,
headers={"Content-Type": "application/json"}
)
print(f"Webhook {url}: {response.status_code}")
except Exception as e:
print(f"Webhook {url} failed: {e}")
Monitoring and Reliability
Health Checks
import time
from datetime import datetime, timedelta
last_message_time = None
STALE_THRESHOLD = timedelta(minutes=5)
def health_check():
"""Check system health"""
issues = []
# Check message freshness
if last_message_time:
age = datetime.utcnow() - last_message_time
if age > STALE_THRESHOLD:
issues.append(f"No messages for {age.seconds // 60} minutes")
# Check Kafka connection
try:
producer.send('health_check', {'timestamp': time.time()})
producer.flush(timeout=5)
except Exception as e:
issues.append(f"Kafka error: {e}")
return {
'healthy': len(issues) == 0,
'issues': issues,
'checked_at': datetime.utcnow().isoformat()
}
Alerting on System Failures
def alert_system_failure(component, error):
"""Alert operators to system failures"""
message = f"""
🚨 SYSTEM ALERT
Component: {component}
Error: {error}
Time: {datetime.utcnow().isoformat()}
Manual intervention may be required.
"""
send_signal_alert(message)
Related Resources
Last updated: March 25, 2026