Python Automation Scripts for VoIP Log Analysis and Monitoring
Python Automation Scripts for VoIP Log Analysis and Monitoring
VoIP systems generate massive amounts of log data. I've built Python scripts that automatically parse, analyze, and alert on Asterisk logs, helping identify issues before they impact users.
🎯 Why Automate Log Analysis?
Manual log analysis is:
- Time-consuming (thousands of lines per day)
- Error-prone (easy to miss critical issues)
- Reactive (problems found after they occur)
- Real-time alerts for critical issues
- Trend analysis over time
- Proactive monitoring before problems escalate
🏗 Architecture
``` Asterisk Logs → Python Parser → Database → Alert System → Dashboard ```
Components
1. Log Parser: Reads and parses Asterisk logs 2. Event Analyzer: Detects patterns and issues 3. Database: Stores analyzed data 4. Alert System: Sends notifications 5. Dashboard: Visualizes trends
💻 Implementation
1. Log Parser
```python
log_parser.py
import re from datetime import datetime from collections import defaultdict
class AsteriskLogParser: def __init__(self, log_file): self.log_file = log_file self.events = []
def parse(self): with open(self.log_file, 'r') as f: for line in f: event = self.parse_line(line) if event: self.events.append(event) return self.events
def parse_line(self, line): # Asterisk log format: [timestamp] level message pattern = r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\] (\w+): (.*)' match = re.match(pattern, line)
if match: timestamp, level, message = match.groups() return { 'timestamp': datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S'), 'level': level, 'message': message, 'raw': line } return None ```
2. Event Detection
```python
event_detector.py
class EventDetector: def __init__(self): self.errors = [] self.warnings = [] self.calls = []
def detect_errors(self, events): error_patterns = [ r'ERROR', r'CRITICAL', r'Failed', r'Timeout', r'Connection refused' ]
for event in events: for pattern in error_patterns: if re.search(pattern, event['message'], re.IGNORECASE): self.errors.append(event) break
return self.errors
def detect_call_events(self, events): call_patterns = { 'call_start': r'Call from (\d+) to (\d+)', 'call_end': r'Hangup.*duration: (\d+)', 'call_failed': r'Call failed.*reason: (.*)' }
for event in events: for event_type, pattern in call_patterns.items(): match = re.search(pattern, event['message']) if match: self.calls.append({ 'type': event_type, 'timestamp': event['timestamp'], 'details': match.groups() })
return self.calls ```
3. Database Storage
```python
database.py
import sqlite3 from datetime import datetime
class LogDatabase: def __init__(self, db_file='asterisk_logs.db'): self.conn = sqlite3.connect(db_file) self.create_tables()
def create_tables(self): cursor = self.conn.cursor()
cursor.execute(''' CREATE TABLE IF NOT EXISTS errors ( id INTEGER PRIMARY KEY, timestamp TEXT, level TEXT, message TEXT, resolved INTEGER DEFAULT 0 ) ''')
cursor.execute(''' CREATE TABLE IF NOT EXISTS calls ( id INTEGER PRIMARY KEY, timestamp TEXT, event_type TEXT, details TEXT ) ''')
self.conn.commit()
def insert_error(self, error): cursor = self.conn.cursor() cursor.execute(''' INSERT INTO errors (timestamp, level, message) VALUES (?, ?, ?) ''', (error['timestamp'].isoformat(), error['level'], error['message'])) self.conn.commit()
def get_recent_errors(self, hours=24): cursor = self.conn.cursor() cursor.execute(''' SELECT * FROM errors WHERE timestamp > datetime('now', '-{} hours') AND resolved = 0 ORDER BY timestamp DESC '''.format(hours)) return cursor.fetchall() ```
4. Alert System
```python
alerts.py
import smtplib from email.mime.text import MIMEText
class AlertSystem: def __init__(self, smtp_config): self.smtp_config = smtp_config
def send_alert(self, subject, message, recipients): msg = MIMEText(message) msg['Subject'] = subject msg['From'] = self.smtp_config['from'] msg['To'] = ', '.join(recipients)
with smtplib.SMTP(self.smtp_config['host'], self.smtp_config['port']) as server: server.starttls() server.login(self.smtp_config['user'], self.smtp_config['password']) server.send_message(msg)
def check_and_alert(self, errors, threshold=10): if len(errors) > threshold: subject = f"High Error Rate: {len(errors)} errors detected" message = f"Found {len(errors)} errors in the last hour:\n\n" for error in errors[:10]: # First 10 message += f"{error['timestamp']}: {error['message']}\n"
self.send_alert(subject, message, ['admin@example.com']) ```
5. Complete Script
```python
monitor_asterisk.py
#!/usr/bin/env python3 import time from log_parser import AsteriskLogParser from event_detector import EventDetector from database import LogDatabase from alerts import AlertSystem
class AsteriskMonitor: def __init__(self, config): self.config = config self.parser = AsteriskLogParser(config['log_file']) self.detector = EventDetector() self.db = LogDatabase(config['db_file']) self.alerts = AlertSystem(config['smtp'])
def run(self): while True: # Parse recent logs events = self.parser.parse()
# Detect issues errors = self.detector.detect_errors(events) calls = self.detector.detect_call_events(events)
# Store in database for error in errors: self.db.insert_error(error)
# Check for alerts recent_errors = self.db.get_recent_errors(hours=1) self.alerts.check_and_alert(recent_errors)
# Wait before next check time.sleep(self.config['check_interval']) ```
📊 Advanced Analysis
1. Call Quality Metrics
```python def analyze_call_quality(calls): metrics = { 'total_calls': len(calls), 'failed_calls': sum(1 for c in calls if c['type'] == 'call_failed'), 'avg_duration': 0, 'success_rate': 0 }
durations = [int(c['details'][0]) for c in calls if c['type'] == 'call_end'] if durations: metrics['avg_duration'] = sum(durations) / len(durations)
metrics['success_rate'] = ( (metrics['total_calls'] - metrics['failed_calls']) / metrics['total_calls'] * 100 )
return metrics ```
2. Trend Analysis
```python import pandas as pd
def analyze_trends(db_file, days=7): conn = sqlite3.connect(db_file)
# Load errors errors_df = pd.read_sql_query(''' SELECT DATE(timestamp) as date, COUNT(*) as count FROM errors WHERE timestamp > datetime('now', '-{} days') GROUP BY DATE(timestamp) '''.format(days), conn)
# Detect trends if len(errors_df) > 1: trend = 'increasing' if errors_df['count'].iloc[-1] > errors_df['count'].iloc[0] else 'decreasing' return { 'trend': trend, 'data': errors_df.to_dict('records') }
return None ```
3. Real-Time Monitoring
```python
Use watchdog to monitor log file changes
from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler
class LogFileHandler(FileSystemEventHandler): def __init__(self, monitor): self.monitor = monitor
def on_modified(self, event): if event.src_path == self.monitor.config['log_file']: self.monitor.run()
Setup real-time monitoring
observer = Observer() observer.schedule(LogFileHandler(monitor), path='.', recursive=False) observer.start() ```
🚀 Production Deployment
Systemd Service
```ini
/etc/systemd/system/asterisk-monitor.service
[Unit] Description=Asterisk Log Monitor After=network.target
[Service] Type=simple User=monitor WorkingDirectory=/opt/asterisk-monitor ExecStart=/usr/bin/python3 /opt/asterisk-monitor/monitor_asterisk.py Restart=always
[Install] WantedBy=multi-user.target ```
Cron for Periodic Analysis
```bash
/etc/cron.hourly/asterisk-analysis
#!/bin/bash /usr/bin/python3 /opt/asterisk-monitor/analyze_trends.py ```
💡 Real-World Example
I built a monitoring system for a PBX handling 10,000+ calls/day:
1. Real-time parser monitors Asterisk logs 2. Detects call failures, errors, and anomalies 3. Stores data in SQLite for analysis 4. Alerts via email when error rate exceeds threshold 5. Dashboard shows trends and metrics
Result: 90% faster issue detection, proactive alerts, comprehensive call analytics.
🎓 Key Takeaways
- Automate log parsing to save time
- Detect patterns to identify issues early
- Store data for trend analysis
- Alert on critical issues
- Monitor in real-time for immediate response
Conclusion
Python automation for VoIP log analysis transforms reactive troubleshooting into proactive monitoring. By parsing logs, detecting patterns, and alerting on issues, you can maintain high-quality VoIP services with minimal manual intervention.