1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| import os import time import platform from datetime import datetime, timedelta
def get_stats(numbers): if not numbers: raise ValueError("列表不能为空") min_val = min(numbers) max_val = max(numbers) avg_val = sum(numbers) / len(numbers)
return min_val, max_val, avg_val
def ping(host): param = '-n' if platform.system().lower() == 'windows' else '-c' command = ['ping', param, '1', '-W', '2', host]
try: response = os.popen(' '.join(command)).read() if "TTL=" in response or "ttl=" in response: if platform.system().lower() == 'windows': time_str = response.split('时间=')[1].split('ms')[0].strip() else: time_str = response.split('time=')[1].split(' ')[0].strip() return (True, float(time_str)) else: return (False, "请求超时") except Exception as e: return (False, f"错误: {str(e)}")
def monitor_ping(host, duration_minutes=60): start_time = datetime.now() end_time = start_time + timedelta(minutes=duration_minutes) #log_file = f"ping_log_{host.replace('.', '_')}_{start_time.strftime('%Y%m%d_%H%M%S')}.txt" log_file = f'/opt/ping.log'
print(f"开始监控 {host},持续 {duration_minutes} 分钟...") print(f"结果将保存到: {log_file}")
with open(log_file, 'a', encoding='utf-8') as f: f.write(f"Ping监控日志 - 目标: {host}\n") f.write(f"开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"预计结束时间: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n") f.write("========================================\n\n") try: while datetime.now() < end_time: current_time = datetime.now() success, result = ping(host)
log_entry = f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} - " if success: log_entry += f"响应时间: {result} ms" resultlist.append(result) else: log_entry += f"失败: {result}" resultlist.append("0")
print(log_entry)
with open(log_file, 'a', encoding='utf-8') as f: f.write(log_entry + "\n")
time.sleep(1) min_val, max_val, avg_val = get_stats(resultlist) with open(log_file, 'a', encoding='utf-8') as f: f.write(f" 总发包: "+str(len(resultlist))+f" 最小值: {min_val}"+f" 最大值: {max_val}"+f" 平均值: {avg_val:.2f}"+f" 失败总数: "+str(resultlist.count("0")))
except KeyboardInterrupt: print("\n用户中断监控") finally: end_time_actual = datetime.now() with open(log_file, 'a', encoding='utf-8') as f: f.write("\n========================================\n") f.write(f"监控结束时间: {end_time_actual.strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"实际监控时长: {str(end_time_actual - start_time)}\n")
print(f"监控已结束,结果已保存到 {log_file}")
if __name__ == "__main__": resultlist = [] target_host = "128.4.0.213" monitor_ping(target_host, 60)
|