Automation & DevOps

Python Automation Scripts for VoIP Log Analysis and Monitoring

December 06, 2024 6 min read By Amey Lokare

Python Automation Scripts for VoIP Log Analysis and Monitoring

VoIP systems generate massive amounts of log data. I've built Python scripts that automatically parse, analyze, and alert on Asterisk logs, helping identify issues before they impact users.

🎯 Why Automate Log Analysis?

Manual log analysis is:

  • Time-consuming (thousands of lines per day)
  • Error-prone (easy to miss critical issues)
  • Reactive (problems found after they occur)
Automated analysis provides:

  • Real-time alerts for critical issues
  • Trend analysis over time
  • Proactive monitoring before problems escalate

🏗 Architecture

``` Asterisk Logs → Python Parser → Database → Alert System → Dashboard ```

Components

1. Log Parser: Reads and parses Asterisk logs 2. Event Analyzer: Detects patterns and issues 3. Database: Stores analyzed data 4. Alert System: Sends notifications 5. Dashboard: Visualizes trends

💻 Implementation

1. Log Parser

```python

log_parser.py

import re from datetime import datetime from collections import defaultdict

class AsteriskLogParser: def __init__(self, log_file): self.log_file = log_file self.events = []

def parse(self): with open(self.log_file, 'r') as f: for line in f: event = self.parse_line(line) if event: self.events.append(event) return self.events

def parse_line(self, line): # Asterisk log format: [timestamp] level message pattern = r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\] (\w+): (.*)' match = re.match(pattern, line)

if match: timestamp, level, message = match.groups() return { 'timestamp': datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S'), 'level': level, 'message': message, 'raw': line } return None ```

2. Event Detection

```python

event_detector.py

class EventDetector: def __init__(self): self.errors = [] self.warnings = [] self.calls = []

def detect_errors(self, events): error_patterns = [ r'ERROR', r'CRITICAL', r'Failed', r'Timeout', r'Connection refused' ]

for event in events: for pattern in error_patterns: if re.search(pattern, event['message'], re.IGNORECASE): self.errors.append(event) break

return self.errors

def detect_call_events(self, events): call_patterns = { 'call_start': r'Call from (\d+) to (\d+)', 'call_end': r'Hangup.*duration: (\d+)', 'call_failed': r'Call failed.*reason: (.*)' }

for event in events: for event_type, pattern in call_patterns.items(): match = re.search(pattern, event['message']) if match: self.calls.append({ 'type': event_type, 'timestamp': event['timestamp'], 'details': match.groups() })

return self.calls ```

3. Database Storage

```python

database.py

import sqlite3 from datetime import datetime

class LogDatabase: def __init__(self, db_file='asterisk_logs.db'): self.conn = sqlite3.connect(db_file) self.create_tables()

def create_tables(self): cursor = self.conn.cursor()

cursor.execute(''' CREATE TABLE IF NOT EXISTS errors ( id INTEGER PRIMARY KEY, timestamp TEXT, level TEXT, message TEXT, resolved INTEGER DEFAULT 0 ) ''')

cursor.execute(''' CREATE TABLE IF NOT EXISTS calls ( id INTEGER PRIMARY KEY, timestamp TEXT, event_type TEXT, details TEXT ) ''')

self.conn.commit()

def insert_error(self, error): cursor = self.conn.cursor() cursor.execute(''' INSERT INTO errors (timestamp, level, message) VALUES (?, ?, ?) ''', (error['timestamp'].isoformat(), error['level'], error['message'])) self.conn.commit()

def get_recent_errors(self, hours=24): cursor = self.conn.cursor() cursor.execute(''' SELECT * FROM errors WHERE timestamp > datetime('now', '-{} hours') AND resolved = 0 ORDER BY timestamp DESC '''.format(hours)) return cursor.fetchall() ```

4. Alert System

```python

alerts.py

import smtplib from email.mime.text import MIMEText

class AlertSystem: def __init__(self, smtp_config): self.smtp_config = smtp_config

def send_alert(self, subject, message, recipients): msg = MIMEText(message) msg['Subject'] = subject msg['From'] = self.smtp_config['from'] msg['To'] = ', '.join(recipients)

with smtplib.SMTP(self.smtp_config['host'], self.smtp_config['port']) as server: server.starttls() server.login(self.smtp_config['user'], self.smtp_config['password']) server.send_message(msg)

def check_and_alert(self, errors, threshold=10): if len(errors) > threshold: subject = f"High Error Rate: {len(errors)} errors detected" message = f"Found {len(errors)} errors in the last hour:\n\n" for error in errors[:10]: # First 10 message += f"{error['timestamp']}: {error['message']}\n"

self.send_alert(subject, message, ['admin@example.com']) ```

5. Complete Script

```python

monitor_asterisk.py

#!/usr/bin/env python3 import time from log_parser import AsteriskLogParser from event_detector import EventDetector from database import LogDatabase from alerts import AlertSystem

class AsteriskMonitor: def __init__(self, config): self.config = config self.parser = AsteriskLogParser(config['log_file']) self.detector = EventDetector() self.db = LogDatabase(config['db_file']) self.alerts = AlertSystem(config['smtp'])

def run(self): while True: # Parse recent logs events = self.parser.parse()

# Detect issues errors = self.detector.detect_errors(events) calls = self.detector.detect_call_events(events)

# Store in database for error in errors: self.db.insert_error(error)

# Check for alerts recent_errors = self.db.get_recent_errors(hours=1) self.alerts.check_and_alert(recent_errors)

# Wait before next check time.sleep(self.config['check_interval']) ```

📊 Advanced Analysis

1. Call Quality Metrics

```python def analyze_call_quality(calls): metrics = { 'total_calls': len(calls), 'failed_calls': sum(1 for c in calls if c['type'] == 'call_failed'), 'avg_duration': 0, 'success_rate': 0 }

durations = [int(c['details'][0]) for c in calls if c['type'] == 'call_end'] if durations: metrics['avg_duration'] = sum(durations) / len(durations)

metrics['success_rate'] = ( (metrics['total_calls'] - metrics['failed_calls']) / metrics['total_calls'] * 100 )

return metrics ```

2. Trend Analysis

```python import pandas as pd

def analyze_trends(db_file, days=7): conn = sqlite3.connect(db_file)

# Load errors errors_df = pd.read_sql_query(''' SELECT DATE(timestamp) as date, COUNT(*) as count FROM errors WHERE timestamp > datetime('now', '-{} days') GROUP BY DATE(timestamp) '''.format(days), conn)

# Detect trends if len(errors_df) > 1: trend = 'increasing' if errors_df['count'].iloc[-1] > errors_df['count'].iloc[0] else 'decreasing' return { 'trend': trend, 'data': errors_df.to_dict('records') }

return None ```

3. Real-Time Monitoring

```python

Use watchdog to monitor log file changes

from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler

class LogFileHandler(FileSystemEventHandler): def __init__(self, monitor): self.monitor = monitor

def on_modified(self, event): if event.src_path == self.monitor.config['log_file']: self.monitor.run()

Setup real-time monitoring

observer = Observer() observer.schedule(LogFileHandler(monitor), path='.', recursive=False) observer.start() ```

🚀 Production Deployment

Systemd Service

```ini

/etc/systemd/system/asterisk-monitor.service

[Unit] Description=Asterisk Log Monitor After=network.target

[Service] Type=simple User=monitor WorkingDirectory=/opt/asterisk-monitor ExecStart=/usr/bin/python3 /opt/asterisk-monitor/monitor_asterisk.py Restart=always

[Install] WantedBy=multi-user.target ```

Cron for Periodic Analysis

```bash

/etc/cron.hourly/asterisk-analysis

#!/bin/bash /usr/bin/python3 /opt/asterisk-monitor/analyze_trends.py ```

💡 Real-World Example

I built a monitoring system for a PBX handling 10,000+ calls/day:

1. Real-time parser monitors Asterisk logs 2. Detects call failures, errors, and anomalies 3. Stores data in SQLite for analysis 4. Alerts via email when error rate exceeds threshold 5. Dashboard shows trends and metrics

Result: 90% faster issue detection, proactive alerts, comprehensive call analytics.

🎓 Key Takeaways

  • Automate log parsing to save time
  • Detect patterns to identify issues early
  • Store data for trend analysis
  • Alert on critical issues
  • Monitor in real-time for immediate response

Conclusion

Python automation for VoIP log analysis transforms reactive troubleshooting into proactive monitoring. By parsing logs, detecting patterns, and alerting on issues, you can maintain high-quality VoIP services with minimal manual intervention.

Comments

Leave a Comment

Related Posts