1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| import os import sys import time import subprocess from datetime import datetime file_name = "scp.log" def append_to_file(file_path, content): try: with open(file_path, 'a', encoding='utf-8') as file: file.write(content + '\n') print(f"成功将内容追加到文件: {file_path}") except Exception as e: print(f"追加内容时出错: {e}") def create_test_file(size_mb, filename="scp_test.tmp"): """创建指定大小的测试文件""" try: with open(filename, 'wb') as f: f.write(os.urandom(int(size_mb * 1024 * 1024))) # 生成随机二进制数据 return filename except Exception as e: append_to_file(file_name,f"创建测试文件失败: {str(e)}") return None def scp_transfer_test(host, username, password, local_file, port=22): """ 调用系统scp命令测试传输 返回: 传输结果字典(含速度信息) """ if not os.path.exists(local_file): append_to_file(file_name,"测试文件不存在") return None # 构建包含密码的命令(使用sshpass自动输入密码,需系统安装sshpass) # 若系统无sshpass,可手动输入密码,但无法自动化测试 cmd = [ "sshpass", "-p", password, "scp", "-o StrictHostKeyChecking=no", "scp_test.tmp", f"{username}@{host}:/tmp" ] try: file_size = os.path.getsize(local_file) start_time = time.time() # 执行scp命令 result = subprocess.run( cmd ) end_time = time.time() elapsed = end_time - start_time # 检查执行结果 if result.returncode != 0: return { "success": False, "error": result.stderr.strip() } # 计算传输速度(MB/s) speed_mb_s = (file_size / (1024 * 1024)) / elapsed if elapsed > 0 else 0 return { "success": True, "size_mb": round(file_size / (1024 * 1024), 2), "time_sec": round(elapsed, 2), "speed_mb_s": round(speed_mb_s, 2), "local_file": local_file } except Exception as e: return { "success": False, "error": str(e) } def main(): # 配置测试参数 scp_HOST = "10.51.6.60" scp_USER = "sibajie" scp_PASS = "sibajie25" scp_PORT = 22 TEST_SIZE_MB = 1000 # 测试文件大小(MB) append_to_file(file_name,"开始时间:"+datetime.now().strftime('%Y-%m-%d %H:%M:%S')) append_to_file(file_name,f"=== scp传输测试 ===") append_to_file(file_name,f"目标服务器: {scp_HOST}:{scp_PORT}") append_to_file(file_name,f"测试文件大小: {TEST_SIZE_MB}MB") # 1. 创建测试文件 local_file = create_test_file(TEST_SIZE_MB) if not local_file: sys.exit(1) # 2. 执行SCP上传测试 append_to_file(file_name,"开始上传测试...") result = scp_transfer_test( host=scp_HOST, username=scp_USER, password=scp_PASS, local_file=local_file, port=scp_PORT ) # 3. 输出结果 if result and result["success"]: append_to_file(file_name,f"传输成功!") append_to_file(file_name,f"文件大小: {result['size_mb']}MB") append_to_file(file_name,f"耗时: {result['time_sec']}秒") append_to_file(file_name,f"传输速度: {result['speed_mb_s']} MB/s") else: append_to_file(file_name,f"传输失败: {result['error'] if result else '未知错误'}") # 4. 清理本地测试文件 if os.path.exists(local_file): os.remove(local_file) if __name__ == "__main__": main()
|