Log Analyzer
2026-03-28
新闻来源:网淘吧
围观:8
电脑广告
手机广告
日志分析器
解析、搜索和调试应用程序日志。涵盖纯文本日志、结构化JSON日志、堆栈跟踪、多服务关联和实时监控。
使用场景
- 从日志文件调试应用程序错误
- 搜索特定模式、错误或请求ID的日志
- 解析和分析堆栈跟踪
- 在应用程序中设置结构化日志记录(JSON)
- 跨多个服务或日志文件关联事件
- 开发期间实时监控日志
- 生成错误频率报告或摘要
快速搜索模式
查找错误和异常
# All errors in a log file
grep -i 'error\|exception\|fatal\|panic\|fail' app.log
# Errors with 3 lines of context
grep -i -C 3 'error\|exception' app.log
# Errors in the last hour (ISO timestamps)
HOUR_AGO=$(date -u -d '1 hour ago' '+%Y-%m-%dT%H:%M' 2>/dev/null || date -u -v-1H '+%Y-%m-%dT%H:%M')
awk -v t="$HOUR_AGO" '$0 ~ /^[0-9]{4}-[0-9]{2}-[0-9]{2}T/ && $1 >= t' app.log | grep -i 'error'
# Count errors by type
grep -oP '(?:Error|Exception): \K[^\n]+' app.log | sort | uniq -c | sort -rn | head -20
# HTTP 5xx errors from access logs
awk '$9 >= 500' access.log
按请求或关联ID搜索
# Trace a single request across log entries
grep 'req-abc123' app.log
# Across multiple files
grep -r 'req-abc123' /var/log/myapp/
# Across multiple services (with filename prefix)
grep -rH 'correlation-id-xyz' /var/log/service-a/ /var/log/service-b/ /var/log/service-c/
时间范围筛选
# Between two timestamps (ISO format)
awk '$0 >= "2026-02-03T10:00" && $0 <= "2026-02-03T11:00"' app.log
# Last N lines (tail)
tail -1000 app.log | grep -i error
# Since a specific time (GNU date)
awk -v start="$(date -d '30 minutes ago' '+%Y-%m-%dT%H:%M')" '$1 >= start' app.log
JSON / 结构化日志
使用 jq 解析
# Pretty-print JSON logs
cat app.log | jq '.'
# Filter by level
cat app.log | jq 'select(.level == "error")'
# Filter by time range
cat app.log | jq 'select(.timestamp >= "2026-02-03T10:00:00Z")'
# Extract specific fields
cat app.log | jq -r '[.timestamp, .level, .message] | @tsv'
# Count by level
cat app.log | jq -r '.level' | sort | uniq -c | sort -rn
# Filter by nested field
cat app.log | jq 'select(.context.userId == "user-123")'
# Group errors by message
cat app.log | jq -r 'select(.level == "error") | .message' | sort | uniq -c | sort -rn
# Extract request duration stats
cat app.log | jq -r 'select(.duration != null) | .duration' | awk '{sum+=$1; count++; if($1>max)max=$1} END {print "count="count, "avg="sum/count, "max="max}'
解析混合格式日志(JSON行与纯文本混合)
# Extract only valid JSON lines
while IFS= read -r line; do
echo "$line" | jq '.' 2>/dev/null && continue
done < app.log
# Or with grep for lines starting with {
grep '^\s*{' app.log | jq '.'
堆栈跟踪分析
提取并去重堆栈跟踪
# Extract Java/Kotlin stack traces (starts with Exception/Error, followed by \tat lines)
awk '/Exception|Error/{trace=$0; while(getline && /^\t/) trace=trace"\n"$0; print trace"\n---"}' app.log
# Extract Python tracebacks
awk '/^Traceback/{p=1} p{print} /^[A-Za-z].*Error/{if(p) print "---"; p=0}' app.log
# Extract Node.js stack traces (Error + indented "at" lines)
awk '/Error:/{trace=$0; while(getline && /^ at /) trace=trace"\n"$0; print trace"\n---"}' app.log
# Deduplicate: group by root cause (first line of trace)
awk '/Exception|Error:/{cause=$0} /^\tat|^ at /{next} cause{print cause; cause=""}' app.log | sort | uniq -c | sort -rn
Python 回溯解析器
#!/usr/bin/env python3
"""Parse Python tracebacks from log files and group by root cause."""
import sys
import re
from collections import Counter
def extract_tracebacks(filepath):
tracebacks = []
current = []
in_trace = False
with open(filepath) as f:
for line in f:
if line.startswith('Traceback (most recent call last):'):
in_trace = True
current = [line.rstrip()]
elif in_trace:
current.append(line.rstrip())
# Exception line ends the traceback
if re.match(r'^[A-Za-z]\w*(Error|Exception|Warning)', line):
tracebacks.append('\n'.join(current))
in_trace = False
current = []
return tracebacks
if __name__ == '__main__':
filepath = sys.argv[1] if len(sys.argv) > 1 else '/dev/stdin'
traces = extract_tracebacks(filepath)
# Group by exception type and message
causes = Counter()
for trace in traces:
lines = trace.split('\n')
cause = lines[-1] if lines else 'Unknown'
causes[cause] += 1
print(f"Found {len(traces)} tracebacks, {len(causes)} unique causes:\n")
for cause, count in causes.most_common(20):
print(f" {count:4d}x {cause}")
实时监控
尾部追踪与过滤
# Follow log file, highlight errors in red
tail -f app.log | grep --color=always -i 'error\|warn\|$'
# Follow and filter to errors only
tail -f app.log | grep --line-buffered -i 'error\|exception'
# Follow JSON logs, pretty-print errors
tail -f app.log | while IFS= read -r line; do
level=$(echo "$line" | jq -r '.level // empty' 2>/dev/null)
if [ "$level" = "error" ] || [ "$level" = "fatal" ]; then
echo "$line" | jq '.'
fi
done
# Follow multiple files
tail -f /var/log/service-a/app.log /var/log/service-b/app.log
# Follow with timestamps (useful when log doesn't include them)
tail -f app.log | while IFS= read -r line; do
echo "$(date '+%H:%M:%S') $line"
done
监控特定模式并发出警报
# Beep on error (terminal bell)
tail -f app.log | grep --line-buffered -i 'error' | while read line; do
echo -e "\a$line"
done
# Count errors per minute
tail -f app.log | grep --line-buffered -i 'error' | while read line; do
echo "$(date '+%Y-%m-%d %H:%M') ERROR"
done | uniq -c
日志格式解析
常见访问日志(Apache/Nginx)
# Parse fields: IP, date, method, path, status, size
awk '{print $1, $9, $7}' access.log
# Top IPs by request count
awk '{print $1}' access.log | sort | uniq -c | sort -rn | head -20
# Top paths by request count
awk '{print $7}' access.log | sort | uniq -c | sort -rn | head -20
# Slow requests (response time in last field, microseconds)
awk '{if ($NF > 1000000) print $0}' access.log
# Requests per minute
awk '{split($4,a,":"); print a[1]":"a[2]":"a[3]}' access.log | uniq -c
# Status code distribution
awk '{print $9}' access.log | sort | uniq -c | sort -rn
# 4xx and 5xx with paths
awk '$9 >= 400 {print $9, $7}' access.log | sort | uniq -c | sort -rn | head -20
自定义分隔符日志
# Pipe-delimited: timestamp|level|service|message
awk -F'|' '{print $2, $3, $4}' app.log
# Tab-delimited
awk -F'\t' '$2 == "ERROR" {print $1, $4}' app.log
# CSV logs
python3 -c "
import csv, sys
with open(sys.argv[1]) as f:
for row in csv.DictReader(f):
if row.get('level') == 'error':
print(f\"{row['timestamp']} {row['message']}\")
" app.csv
设置结构化日志记录
Node.js(pino — 快速 JSON 日志记录器)
// npm install pino
const pino = require('pino');
const logger = pino({
level: process.env.LOG_LEVEL || 'info',
// Add standard fields to every log line
base: { service: 'my-api', version: '1.2.0' },
});
// Usage
logger.info({ userId: 'u123', action: 'login' }, 'User logged in');
logger.error({ err, requestId: req.id }, 'Request failed');
// Output: {"level":30,"time":1706900000000,"service":"my-api","userId":"u123","action":"login","msg":"User logged in"}
// Child logger with bound context
const reqLogger = logger.child({ requestId: req.id, userId: req.user?.id });
reqLogger.info('Processing order');
reqLogger.error({ err }, 'Order failed');
Python(structlog)
# pip install structlog
import structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.JSONRenderer(),
],
)
logger = structlog.get_logger(service="my-api")
# Usage
logger.info("user_login", user_id="u123", ip="1.2.3.4")
logger.error("request_failed", request_id="req-abc", error=str(e))
# Output: {"event":"user_login","user_id":"u123","ip":"1.2.3.4","level":"info","timestamp":"2026-02-03T12:00:00Z","service":"my-api"}
Go(zerolog)
import (
"os"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
func init() {
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
log.Logger = zerolog.New(os.Stdout).With().
Timestamp().
Str("service", "my-api").
Logger()
}
// Usage
log.Info().Str("userId", "u123").Msg("User logged in")
log.Error().Err(err).Str("requestId", reqID).Msg("Request failed")
错误模式报告
生成错误频率报告
#!/bin/bash
# error-report.sh - Summarize errors from a log file
LOG="${1:?Usage: error-report.sh <logfile>}"
echo "=== Error Report: $(basename "$LOG") ==="
echo "Generated: $(date -u '+%Y-%m-%dT%H:%M:%SZ')"
echo ""
total=$(wc -l < "$LOG")
errors=$(grep -ci 'error\|exception\|fatal' "$LOG")
warns=$(grep -ci 'warn' "$LOG")
echo "Total lines: $total"
echo "Errors: $errors"
echo "Warnings: $warns"
echo ""
echo "--- Top 15 Error Messages ---"
grep -i 'error\|exception' "$LOG" | \
sed 's/^[0-9TZ:.+\-]* //' | \
sed 's/\b[0-9a-f]\{8,\}\b/ID/g' | \
sed 's/[0-9]\{1,\}/N/g' | \
sort | uniq -c | sort -rn | head -15
echo ""
echo "--- Errors Per Hour ---"
grep -i 'error\|exception' "$LOG" | \
grep -oP '\d{4}-\d{2}-\d{2}T\d{2}' | \
sort | uniq -c
echo ""
echo "--- First Occurrence of Each Error Type ---"
grep -i 'error\|exception' "$LOG" | \
sed 's/^[0-9TZ:.+\-]* //' | \
sort -u | head -10
使用 Python 生成 JSON 日志错误报告
#!/usr/bin/env python3
"""Generate error summary from JSON log files."""
import json
import sys
from collections import Counter, defaultdict
from datetime import datetime
def analyze_logs(filepath):
errors = []
levels = Counter()
errors_by_hour = defaultdict(int)
with open(filepath) as f:
for line in f:
try:
entry = json.loads(line.strip())
except (json.JSONDecodeError, ValueError):
continue
level = entry.get('level', entry.get('severity', '')).lower()
levels[level] += 1
if level in ('error', 'fatal', 'critical'):
msg = entry.get('message', entry.get('msg', entry.get('event', 'unknown')))
ts = entry.get('timestamp', entry.get('time', ''))
errors.append({'message': msg, 'timestamp': ts, 'entry': entry})
# Group by hour
try:
hour = ts[:13] # "2026-02-03T12"
errors_by_hour[hour] += 1
except (TypeError, IndexError):
pass
# Group errors by message
error_counts = Counter(e['message'] for e in errors)
print(f"=== Log Analysis: {filepath} ===\n")
print("Level distribution:")
for level, count in levels.most_common():
print(f" {level:10s} {count}")
print(f"\nTotal errors: {len(errors)}")
print(f"Unique error messages: {len(error_counts)}\n")
print("Top 15 errors:")
for msg, count in error_counts.most_common(15):
print(f" {count:4d}x {msg[:100]}")
if errors_by_hour:
print("\nErrors by hour:")
for hour in sorted(errors_by_hour):
bar = '#' * min(errors_by_hour[hour], 50)
print(f" {hour} {errors_by_hour[hour]:4d} {bar}")
if __name__ == '__main__':
analyze_logs(sys.argv[1])
多服务日志关联
合并和排序来自多个服务的日志
# Merge multiple log files, sort by timestamp
sort -m -t'T' -k1,1 service-a.log service-b.log service-c.log > merged.log
# If files aren't individually sorted, use full sort
sort -t'T' -k1,1 service-*.log > merged.log
# Merge JSON logs, add source field
for f in service-*.log; do
service=$(basename "$f" .log)
jq --arg svc "$service" '. + {source: $svc}' "$f"
done | jq -s 'sort_by(.timestamp)[]'
跨服务追踪请求
# Find all log entries for a correlation/request ID across all services
REQUEST_ID="req-abc-123"
grep -rH "$REQUEST_ID" /var/log/services/ | sort -t: -k2
# With JSON logs
for f in /var/log/services/*.log; do
jq --arg rid "$REQUEST_ID" 'select(.requestId == $rid or .correlationId == $rid)' "$f" 2>/dev/null
done | jq -s 'sort_by(.timestamp)[]'
日志轮转与大文件处理
处理轮转/压缩日志
# Search across rotated logs (including .gz)
zgrep -i 'error' /var/log/app.log*
# Search today's and yesterday's logs
zgrep -i 'error' /var/log/app.log /var/log/app.log.1
# Decompress, filter, and recompress
zcat app.log.3.gz | grep 'ERROR' | gzip > errors-day3.gz
大型文件采样
# Random sample of 1000 lines
shuf -n 1000 huge.log > sample.log
# Every 100th line
awk 'NR % 100 == 0' huge.log > sample.log
# First and last 500 lines
{ head -500 huge.log; echo "--- TRUNCATED ---"; tail -500 huge.log; } > excerpt.log
提示
- 始终搜索请求ID或关联ID首先——它比时间戳或错误消息能更快地缩小排查范围。
- 使用
--line-buffered与grep配合,当从tail -f管道传输时,可以避免输出因缓冲而延迟。 - 在分组错误之前,先对ID和数字进行标准化处理(
sed 's/[0-9a-f]\{8,\}/ID/g'),以合并仅ID不同的重复项。 - 对于JSON日志,
jq是不可或缺的工具。如果尚未安装,请安装它:apt install jq或brew install jq。 - 结构化日志记录(JSON)的初始设置成本总是值得的。它让每一项分析任务都变得更简单:过滤、分组、关联和告警都变成了
jq单行命令。 - 调试生产问题时:先获取时间窗口和受影响的用户/请求ID,然后在阅读任何内容前将日志过滤到该范围。
awk比grep | sort | uniq -c管道处理大文件更快。用其进行计数和聚合操作。
文章底部电脑广告
手机广告位-内容正文底部


微信扫一扫,打赏作者吧~