循环ping服务器并写入日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import os
import time
import platform
from datetime import datetime, timedelta

def get_stats(numbers):
if not numbers:
raise ValueError("列表不能为空")
min_val = min(numbers)
max_val = max(numbers)
avg_val = sum(numbers) / len(numbers)

return min_val, max_val, avg_val

def ping(host):
param = '-n' if platform.system().lower() == 'windows' else '-c'
command = ['ping', param, '1', '-W', '2', host]

try:
response = os.popen(' '.join(command)).read()
if "TTL=" in response or "ttl=" in response:
if platform.system().lower() == 'windows':
time_str = response.split('时间=')[1].split('ms')[0].strip()
else:
time_str = response.split('time=')[1].split(' ')[0].strip()
return (True, float(time_str))
else:
return (False, "请求超时")
except Exception as e:
return (False, f"错误: {str(e)}")

def monitor_ping(host, duration_minutes=60):
start_time = datetime.now()
end_time = start_time + timedelta(minutes=duration_minutes)
#log_file = f"ping_log_{host.replace('.', '_')}_{start_time.strftime('%Y%m%d_%H%M%S')}.txt"
log_file = f'/opt/ping.log'

print(f"开始监控 {host},持续 {duration_minutes} 分钟...")
print(f"结果将保存到: {log_file}")

with open(log_file, 'a', encoding='utf-8') as f:
f.write(f"Ping监控日志 - 目标: {host}\n")
f.write(f"开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"预计结束时间: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("========================================\n\n")
try:
while datetime.now() < end_time:
current_time = datetime.now()
success, result = ping(host)

log_entry = f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} - "
if success:
log_entry += f"响应时间: {result} ms"
resultlist.append(result)
else:
log_entry += f"失败: {result}"
resultlist.append("0")

print(log_entry)

with open(log_file, 'a', encoding='utf-8') as f:
f.write(log_entry + "\n")

time.sleep(1)
min_val, max_val, avg_val = get_stats(resultlist)
with open(log_file, 'a', encoding='utf-8') as f:
f.write(f" 总发包: "+str(len(resultlist))+f" 最小值: {min_val}"+f" 最大值: {max_val}"+f" 平均值: {avg_val:.2f}"+f" 失败总数: "+str(resultlist.count("0")))

except KeyboardInterrupt:
print("\n用户中断监控")
finally:
end_time_actual = datetime.now()
with open(log_file, 'a', encoding='utf-8') as f:
f.write("\n========================================\n")
f.write(f"监控结束时间: {end_time_actual.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"实际监控时长: {str(end_time_actual - start_time)}\n")

print(f"监控已结束,结果已保存到 {log_file}")

if __name__ == "__main__":
resultlist = []
target_host = "128.4.0.213"
monitor_ping(target_host, 60)

python3测试运行通过。