Python Tkinter 与 Socket 实现图片传输应用

在网络通信领域,文件传输是一个常见的需求,而图形用户界面 (GUI) 则能极大提升用户操作体验。本文将结合imgClient1.pyimgServer1.py两个示例程序,详细介绍如何使用 Python 的 Tkinter 库构建图形界面,并结合 Socket 实现图片的网络传输功能。

Tkinter:Python 的 GUI 库

Tkinter 是 Python 自带的图形用户界面工具包,无需额外安装即可使用,超级适合快速开发简单到中等复杂度的桌面应用程序。在我们的图片传输应用中,Tkinter 被用于创建客户端和服务器的操作界面。

界面设计核心要素

两个程序都采用了面向对象的方式构建界面,通过创建主类(ImageTransferClient和ImageServerApp)来封装所有功能:

窗口初始化:设置窗口标题、大小和是否可调整

self.root = root
self.root.title("圖片傳輸客戶端")
self.root.geometry("600x400")
self.root.resizable(False, False)

布局管理:使用网格 (Grid) 布局管理器组织界面元素

main_frame = ttk.Frame(self.root, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)

# 设置网格权重,使界面元素能够响应窗口大小变化
main_frame.columnconfigure(1, weight=1)
main_frame.rowconfigure(4, weight=1)

常用控件:输入控件(Entry):用于输入 IP 地址、端口号等按钮控件(Button):触发各种操作(选择文件夹、发送文件等)进度条(Progressbar):显示文件传输进度文本区域(Text/ScrolledText):显示状态信息和日志

界面更新:在多线程环境下安全更新 UI 元素

# 使用after()方法确保UI更新在主线程执行
self.root.after(0, self.update_progress, progress_percentage)

Socket:网络通信的基础

Socket(套接字)是网络编程的基础,它提供了进程间通信的接口。在本应用中,我们使用 TCP 套接字(SOCK_STREAM)实现可靠的图片传输。

TCP 通信的基本流程

服务器端

创建套接字

绑定到特定 IP 和端口

监听连接请求

接受客户端连接收

发数据

关闭连接

# 服务器创建和绑定过程
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.bind(('0.0.0.0', port))
self.server_socket.listen(5)

客户端

  • 创建套接字
  • 连接到服务器
  • 收发数据
  • 关闭连接
# 客户端连接过程
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((ip_address, port))

图片传输的实现细节

  1. 数据打包与解析:使用struct模块处理二进制数据
# 客户端打包文件信息
file_info = struct.pack('128sIq', file_name_padded, file_ext, file_size)
client_socket.sendall(file_info)

# 服务器解析文件信息
file_name, file_ext, file_size = struct.unpack('128sIq', buf)

2.文件传输:分块读取和发送文件内容

# 客户端发送文件内容
with open(file_path, 'rb') as f:
    total_sent = 0
    buffer = bytearray(4096)
    while total_sent < file_size:
        bytes_read = f.readinto(buffer)
        client_socket.sendall(buffer[:bytes_read])
        total_sent += bytes_read

3.确认机制:确保文件完整传输

# 服务器发送确认信息
client_socket.sendall(b'x01')  # 传输成功
# 或
client_socket.sendall(b'x00')  # 传输失败

多线程处理并发

为了避免界面冻结和实现并发处理,两个程序都使用了多线程技术:

  1. 服务器端:为每个客户端连接创建单独的线程
client_thread = threading.Thread(
    target=self.handle_client,
    args=(client_socket, client_addr),
    daemon=True
)
self.client_threads.append(client_thread)
client_thread.start()

2.客户端:使用线程池并发发送多个文件

with ThreadPoolExecutor(max_workers=thread_count) as executor:
    futures = [executor.submit(self.send_single_image, ip_address, port, file_path)
               for file_path in self.image_files]

3.线程安全:使用锁机制确保共享资源的正确访问

with self.progress_lock:
    self.sent_files += 1
    progress_percentage = int((self.sent_files / self.total_files) * 100)

错误处理与用户反馈

一个健壮的应用程序必须包含完善的错误处理机制:

  1. 输入验证:确保用户输入有效的 IP 地址、端口号等
try:
    port = int(port_text)
    if port < 1 or port > 65535:
        raise ValueError
except ValueError:
    messagebox.showinfo("提示", "請輸入有效的端口號 (1-65535)")

2.网络错误处理:针对不同的网络错误提供明确的提示

except socket.error as ex:
    error_msg = ""
    if ex.errno == 10061:
        error_msg = "無法連接到服務器,請檢查服務器是否已啟動且端口正確"
    elif ex.errno == 10049:
        error_msg = "無效的IP地址,請確認輸入正確"
    # 其他错误类型...

3.用户反馈:通过消息框和日志及时反馈操作结果

# 消息框提示
messagebox.showinfo("完成", f"成功發送 {self.total_files} 個圖片文件")

# 日志记录
self.log(f"文件接收完成: {file_name}")

imgClient1.py 源码:

import tkinter as tk
from tkinter import filedialog, messagebox, ttk
import socket
import os
import threading
import struct
from tkinter.scrolledtext import ScrolledText
from concurrent.futures import ThreadPoolExecutor, as_completed


class ImageTransferClient:
    def __init__(self, root):
        self.root = root
        self.root.title("圖片傳輸客戶端")
        self.root.geometry("600x400")
        self.root.resizable(False, False)

        # 支援的圖片文件副檔名
        self.image_extensions = ('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.ico', '.webp')

        # 選中的文件夾路徑
        self.selected_folder_path = ""
        # 圖片文件列表
        self.image_files = []

        # 多线程相关
        self.total_files = 0
        self.sent_files = 0
        self.progress_lock = threading.Lock()  # 进度更新锁

        self.create_widgets()

        # 初始化端口为8888
        self.port_entry.insert(0, "8888")

    def create_widgets(self):
        # 保持原有的UI创建逻辑不变
        main_frame = ttk.Frame(self.root, padding="10")
        main_frame.pack(fill=tk.BOTH, expand=True)

        # IP地址标签和输入框
        ttk.Label(main_frame, text="目標IP地址:").grid(row=0, column=0, sticky=tk.W, pady=5)
        self.ip_entry = ttk.Entry(main_frame, width=20)
        self.ip_entry.grid(row=0, column=1, sticky=tk.W, pady=5)

        # 端口标签和输入框
        ttk.Label(main_frame, text="端口號:").grid(row=0, column=2, sticky=tk.W, pady=5, padx=(10, 0))
        self.port_entry = ttk.Entry(main_frame, width=10)
        self.port_entry.grid(row=0, column=3, sticky=tk.W, pady=5)

        # 文件夹路径标签和输入框
        ttk.Label(main_frame, text="圖片文件夾:").grid(row=1, column=0, sticky=tk.W, pady=5)
        self.folder_entry = ttk.Entry(main_frame, width=40)
        self.folder_entry.grid(row=1, column=1, columnspan=3, sticky=tk.W, pady=5)

        # 选择文件夹按钮
        self.select_folder_btn = ttk.Button(main_frame, text="選擇文件夾", command=self.select_folder)
        self.select_folder_btn.grid(row=1, column=4, padx=5, pady=5)

        # 并发数设置
        ttk.Label(main_frame, text="并发数:").grid(row=2, column=0, sticky=tk.W, pady=5)
        self.thread_count = tk.StringVar(value="3")
        self.thread_entry = ttk.Entry(main_frame, textvariable=self.thread_count, width=5)
        self.thread_entry.grid(row=2, column=1, sticky=tk.W, pady=5)

        # 发送按钮
        self.send_btn = ttk.Button(main_frame, text="發送圖片", command=self.start_send)
        self.send_btn.grid(row=2, column=2, columnspan=3, pady=10)

        # 进度条
        self.progress_bar = ttk.Progressbar(main_frame, orient="horizontal", length=100, mode="determinate")
        self.progress_bar.grid(row=3, column=0, columnspan=5, sticky=tk.EW, pady=5)
        self.progress_bar.grid_remove()

        # 状态标签
        ttk.Label(main_frame, text="狀態:").grid(row=4, column=0, sticky=tk.NW, pady=5)

        # 状态文本框
        self.status_text = ScrolledText(main_frame, height=10, width=60, state=tk.DISABLED)
        self.status_text.grid(row=4, column=1, columnspan=4, sticky=tk.NSEW, pady=5)

        # 设置网格权重
        main_frame.columnconfigure(1, weight=1)
        main_frame.rowconfigure(4, weight=1)
        main_frame.columnconfigure(0, weight=1)
        main_frame.columnconfigure(2, weight=1)
        main_frame.columnconfigure(3, weight=1)
        main_frame.columnconfigure(4, weight=1)

    def select_folder(self):
        """选择包含图片的文件夹"""
        folder_path = filedialog.askdirectory(title="請選擇包含圖片的文件夾")
        if folder_path:
            self.selected_folder_path = folder_path
            self.folder_entry.delete(0, tk.END)
            self.folder_entry.insert(0, folder_path)
            self.find_image_files()

    def find_image_files(self):
        """查找文件夹中的所有图片文件"""
        try:
            self.image_files.clear()
            all_files = [f for f in os.listdir(self.selected_folder_path)
                         if os.path.isfile(os.path.join(self.selected_folder_path, f))]

            for file in all_files:
                ext = os.path.splitext(file)[1].lower()
                if ext in self.image_extensions:
                    self.image_files.append(os.path.join(self.selected_folder_path, file))

            self.update_status(f"找到 {len(self.image_files)} 個圖片文件")
        except Exception as ex:
            messagebox.showerror("錯誤", f"查找圖片時出錯: {str(ex)}")
            self.update_status("查找圖片失敗")

    def update_status(self, message):
        """更新状态文本框"""
        self.status_text.config(state=tk.NORMAL)
        self.status_text.insert(tk.END, message + "
")
        self.status_text.see(tk.END)
        self.status_text.config(state=tk.DISABLED)

    def update_progress(self, value):
        """更新进度条"""
        self.progress_bar["value"] = value
        self.root.update_idletasks()

    def start_send(self):
        """开始发送图片"""
        # 输入验证
        ip_address = self.ip_entry.get().strip()
        port_text = self.port_entry.get().strip()

        try:
            thread_count = int(self.thread_count.get().strip())
            if thread_count < 1 or thread_count > 10:  # 限制并发数在1-10之间
                raise ValueError
        except ValueError:
            messagebox.showinfo("提示", "请输入有效的并发数 (1-10)")
            return

        if not ip_address:
            messagebox.showinfo("提示", "請輸入目標IP地址")
            return

        try:
            port = int(port_text)
            if port < 1 or port > 65535:
                raise ValueError
        except ValueError:
            messagebox.showinfo("提示", "請輸入有效的端口號 (1-65535)")
            return

        if not self.selected_folder_path or len(self.image_files) == 0:
            messagebox.showinfo("提示", "請先選擇包含圖片的文件夾")
            return

        # 禁用按钮,防止重复发送
        self.select_folder_btn.config(state=tk.DISABLED)
        self.send_btn.config(state=tk.DISABLED)
        self.progress_bar.grid()
        self.update_progress(0)
        self.update_status("準備發送...")

        # 初始化进度计数
        self.total_files = len(self.image_files)
        self.sent_files = 0

        # 在新线程中管理发送任务,避免UI冻结
        threading.Thread(
            target=self.manage_send_tasks,
            args=(ip_address, port, thread_count),
            daemon=True
        ).start()

    def manage_send_tasks(self, ip_address, port, thread_count):
        """管理多线程发送任务"""
        try:
            self.update_status(f"嘗試連接到 {ip_address}:{port}...")
            self.update_status(f"準備發送 {len(self.image_files)} 個文件,使用 {thread_count} 個線程")

            # 使用线程池并发发送文件
            with ThreadPoolExecutor(max_workers=thread_count) as executor:
                # 提交所有任务
                futures = [executor.submit(self.send_single_image, ip_address, port, file_path)
                           for file_path in self.image_files]

                # 监控任务完成情况
                for future in as_completed(futures):
                    try:
                        result = future.result()
                        if result:
                            self.update_status(result)
                    except Exception as ex:
                        self.update_status(f"線程錯誤: {str(ex)}")

            # 全部完成后更新状态
            self.root.after(0, self.update_progress, 100)
            self.update_status(f"所有 {self.total_files} 個文件發送完成")
            self.root.after(0, lambda: messagebox.showinfo("完成", f"成功發送 {self.total_files} 個圖片文件"))

        except socket.error as ex:
            error_msg = ""
            if ex.errno == 10061:
                error_msg = "無法連接到服務器,請檢查服務器是否已啟動且端口正確"
            elif ex.errno == 10049:
                error_msg = "無效的IP地址,請確認輸入正確"
            elif ex.errno == 10060:
                error_msg = "連接超時,服務器沒有響應"
            elif ex.errno == 10054:
                error_msg = "連接被服務器關閉"
            elif ex.errno == 10053:
                error_msg = "連接被中止,可能是服務器解析數據出錯"
            else:
                error_msg = f"網路錯誤: {str(ex)} (錯誤代碼: {ex.errno})"

            self.update_status(error_msg)
        except Exception as ex:
            self.update_status(f"發送失敗: {str(ex)}")
        finally:
            self.reset_ui()

    def send_single_image(self, ip_address, port, file_path):
        """发送单个图片文件(供线程调用)"""
        client_socket = None
        try:
            file_name = os.path.basename(file_path)

            # 检查文件是否存在
            if not os.path.exists(file_path):
                return f"文件不存在: {file_name}"

            # 获取文件大小
            file_size = os.path.getsize(file_path)

            # 创建新的socket连接
            client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            client_socket.settimeout(10.0)
            client_socket.connect((ip_address, port))

            # 打包文件信息:128字节文件名 + 4字节文件类型(0表明文件) + 8字节文件大小
            # 确保文件名编码后不超过128字节
            file_name_bytes = file_name.encode('utf-8')[:128]  # 截断过长的文件名
            # 不足128字节的用空字节填充
            file_name_padded = file_name_bytes.ljust(128, b'x00')
            file_ext = 0  # 0表明文件,1表明文件夹

            # 发送文件信息
            file_info = struct.pack('128sIq', file_name_padded, file_ext, file_size)
            client_socket.sendall(file_info)

            # 发送文件内容
            with open(file_path, 'rb') as f:
                total_sent = 0
                buffer = bytearray(4096)
                while total_sent < file_size:
                    bytes_read = f.readinto(buffer)
                    if bytes_read == 0:
                        break
                    client_socket.sendall(buffer[:bytes_read])
                    total_sent += bytes_read

            # 等待服务器确认
            confirmation = client_socket.recv(1)
            if not confirmation or confirmation[0] != 1:
                return f"未收到服務器確認: {file_name}"

            # 更新进度
            with self.progress_lock:
                self.sent_files += 1
                progress_percentage = int((self.sent_files / self.total_files) * 100)
                self.root.after(0, self.update_progress, progress_percentage)

            return f"成功發送: {file_name}"

        except Exception as ex:
            return f"發送文件 {file_name} 出錯: {str(ex)}"
        finally:
            if client_socket:
                try:
                    client_socket.close()
                except:
                    pass

    def reset_ui(self):
        """重置UI状态"""
        self.root.after(0, lambda: self.select_folder_btn.config(state=tk.NORMAL))
        self.root.after(0, lambda: self.send_btn.config(state=tk.NORMAL))
        self.root.after(0, lambda: self.progress_bar.grid_remove())


if __name__ == "__main__":
    root = tk.Tk()
    app = ImageTransferClient(root)
    root.mainloop()

imgServer1.py 源码:

import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import socket
import threading
import os
import struct
import time


class ImageServerApp:
    def __init__(self, root):
        self.root = root
        self.root.title("圖片伺服器")
        self.root.geometry("600x400")

        # 伺服器狀態
        self.is_running = False
        self.server_socket = None
        self.client_threads = []

        # 接收文件保存路徑
        self.save_dir = os.path.join(os.getcwd(), "received_files")
        if not os.path.exists(self.save_dir):
            os.makedirs(self.save_dir)

        # 創建界面
        self.create_widgets()

    def create_widgets(self):
        # 配置網格布局
        self.root.grid_columnconfigure(0, weight=1)
        self.root.grid_rowconfigure(3, weight=1)

        # 伺服器設置區域
        setting_frame = ttk.LabelFrame(self.root, text="伺服器設置")
        setting_frame.grid(row=0, column=0, padx=10, pady=5, sticky="ew")
        setting_frame.grid_columnconfigure(1, weight=1)

        ttk.Label(setting_frame, text="端口:").grid(row=0, column=0, padx=5, pady=5, sticky="w")
        self.port_entry = ttk.Entry(setting_frame)
        self.port_entry.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
        self.port_entry.insert(0, "8888")

        ttk.Label(setting_frame, text="保存路徑:").grid(row=1, column=0, padx=5, pady=5, sticky="w")
        self.path_var = tk.StringVar(value=self.save_dir)
        ttk.Entry(setting_frame, textvariable=self.path_var).grid(row=1, column=1, padx=5, pady=5, sticky="ew")

        browse_btn = ttk.Button(setting_frame, text="瀏覽...", command=self.browse_save_dir)
        browse_btn.grid(row=1, column=2, padx=5, pady=5)

        # 控制按鈕區域
        btn_frame = ttk.Frame(self.root)
        btn_frame.grid(row=1, column=0, padx=10, pady=5, sticky="ew")

        self.start_btn = ttk.Button(btn_frame, text="啟動伺服器", command=self.start_server)
        self.start_btn.pack(side=tk.LEFT, padx=5)

        self.stop_btn = ttk.Button(btn_frame, text="停止伺服器", command=self.stop_server, state=tk.DISABLED)
        self.stop_btn.pack(side=tk.LEFT, padx=5)

        # 狀態區域
        status_frame = ttk.LabelFrame(self.root, text="狀態")
        status_frame.grid(row=2, column=0, padx=10, pady=5, sticky="ew")

        self.status_var = tk.StringVar(value="伺服器未啟動")
        ttk.Label(status_frame, textvariable=self.status_var).pack(anchor="w", padx=5, pady=5)

        # 日誌區域
        log_frame = ttk.LabelFrame(self.root, text="接收日誌")
        log_frame.grid(row=3, column=0, padx=10, pady=5, sticky="nsew")
        log_frame.grid_columnconfigure(0, weight=1)
        log_frame.grid_rowconfigure(0, weight=1)

        self.log_text = tk.Text(log_frame, wrap=tk.WORD, state=tk.DISABLED)
        self.log_text.grid(row=0, column=0, sticky="nsew")

        scrollbar = ttk.Scrollbar(log_frame, command=self.log_text.yview)
        scrollbar.grid(row=0, column=1, sticky="ns")
        self.log_text.config(yscrollcommand=scrollbar.set)

    def browse_save_dir(self):
        directory = filedialog.askdirectory(title="選擇保存路徑")
        if directory:
            self.path_var.set(directory)
            self.save_dir = directory

    def log(self, message):
        """在日誌區域顯示消息"""
        self.log_text.config(state=tk.NORMAL)
        timestamp = time.strftime("%H:%M:%S")
        self.log_text.insert(tk.END, f"[{timestamp}] {message}
")
        self.log_text.see(tk.END)
        self.log_text.config(state=tk.DISABLED)

    def get_local_ip(self):
        """獲取本機區域網IP地址"""
        try:
            # 通過連接外部地址獲取本機出口IP
            with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
                s.connect(("8.8.8.8", 80))  # 連接到公共DNS伺服器
                return s.getsockname()[0]
        except:
            # 出錯時返回本地回環地址
            return "127.0.0.1"

    def start_server(self):
        """啟動伺服器"""
        try:
            port = int(self.port_entry.get())
            if port < 1 or port > 65535:
                messagebox.showerror("錯誤", "端口號必須在1-65535之間")
                return

            self.save_dir = self.path_var.get()
            if not os.path.exists(self.save_dir):
                os.makedirs(self.save_dir)

            # 獲取本機IP地址
            local_ip = self.get_local_ip()

            # 創建伺服器socket
            self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.server_socket.bind(('0.0.0.0', port))
            self.server_socket.listen(5)
            self.is_running = True

            # 更新狀態和按鈕
            self.status_var.set(f"伺服器運行中,監聽端口: {port}")
            self.start_btn.config(state=tk.DISABLED)
            self.stop_btn.config(state=tk.NORMAL)
            self.log(f"伺服器已啟動,監聽端口: {port}")
            self.log(f"服務端IP地址: {local_ip}")

            # 啟動接受客戶端連接的線程
            threading.Thread(target=self.accept_clients, daemon=True).start()

        except Exception as e:
            messagebox.showerror("啟動失敗", f"無法啟動伺服器: {str(e)}")
            self.log(f"啟動失敗: {str(e)}")

    def stop_server(self):
        """停止伺服器"""
        if self.is_running and self.server_socket:
            self.is_running = False
            try:
                # 關閉伺服器socket
                self.server_socket.close()
                # 等待所有客戶端線程結束
                for thread in self.client_threads:
                    if thread.is_alive():
                        thread.join(timeout=1.0)

                self.status_var.set("伺服器已停止")
                self.start_btn.config(state=tk.NORMAL)
                self.stop_btn.config(state=tk.DISABLED)
                self.log("伺服器已停止")

            except Exception as e:
                self.log(f"停止伺服器時出錯: {str(e)}")

    def accept_clients(self):
        """接受客戶端連接"""
        while self.is_running:
            try:
                client_socket, client_addr = self.server_socket.accept()
                self.log(f"新連接: {client_addr[0]}:{client_addr[1]}")

                # 為每個客戶端創建一個新線程處理
                client_thread = threading.Thread(
                    target=self.handle_client,
                    args=(client_socket, client_addr),
                    daemon=True
                )
                self.client_threads.append(client_thread)
                client_thread.start()

            except Exception as e:
                if self.is_running:  # 如果不是正常關閉導致的錯誤
                    self.log(f"接受連接時出錯: {str(e)}")
                break

    def handle_client(self, client_socket, client_addr):
        """處理客戶端連接,接收文件和文件夾"""
        try:
            while self.is_running:
                # 先接收文件信息(固定140字節:128sIq)
                file_info_size = struct.calcsize('128sIq')
                buf = b''  # 用於累加接收的數據

                # 循環接收,直到湊夠預期的字節數
                while len(buf) < file_info_size:
                    remaining = file_info_size - len(buf)
                    chunk = client_socket.recv(remaining)  # 接收剩餘所需字節
                    if not chunk:  # 客戶端斷開連接
                        self.log(f"客戶端 {client_addr} 提前斷開連接")
                        return
                    buf += chunk

                # 確認接收完整後再解析
                if len(buf) != file_info_size:
                    self.log(f"文件信息不完整,預期{file_info_size}字節,實際{len(buf)}字節")
                    break

                # 解析文件信息
                file_name, file_ext, file_size = struct.unpack('128sIq', buf)
                file_name = file_name.decode().strip('x00')  # 去除填充的空字符

                # 檢查是否是文件夾
                if file_ext == 1:  # 約定1表明文件夾
                    folder_path = os.path.join(self.save_dir, file_name)
                    if not os.path.exists(folder_path):
                        os.makedirs(folder_path)
                        self.log(f"創建文件夾: {file_name}")
                    continue

                # 接收文件內容
                self.log(f"開始接收文件: {file_name}, 大小: {self.format_size(file_size)}")

                # 確保文件所在目錄存在
                file_path = os.path.join(self.save_dir, file_name)
                file_dir = os.path.dirname(file_path)
                if not os.path.exists(file_dir):
                    os.makedirs(file_dir)

                # 接收文件數據
                received_size = 0
                with open(file_path, 'wb') as f:
                    while received_size < file_size and self.is_running:
                        recv_size = min(1024 * 1024, file_size - received_size)  # 每次最多接收1MB
                        data = client_socket.recv(recv_size)
                        if not data:
                            break
                        f.write(data)
                        received_size += len(data)

                        # 計算並顯示進度
                        progress = (received_size / file_size) * 100
                        if progress % 10 == 0:  # 每10%記錄一次日誌
                            self.log(f"接收 {file_name}: {progress:.1f}%")

                if received_size == file_size:
                    self.log(f"文件接收完成: {file_name}")
                    # 發送確認信息
                    client_socket.sendall(b'x01')
                else:
                    self.log(f"文件接收中斷: {file_name}, 已接收: {self.format_size(received_size)}")
                    # 如果文件未接收完整,刪除不完整文件
                    if os.path.exists(file_path):
                        os.remove(file_path)
                    client_socket.sendall(b'x00')

        except Exception as e:
            self.log(f"處理客戶端 {client_addr} 時出錯: {str(e)}")

        finally:
            client_socket.close()
            self.log(f"客戶端 {client_addr} 連接已關閉")

    def format_size(self, size_bytes):
        """格式化文件大小顯示"""
        units = ['B', 'KB', 'MB', 'GB']
        size = size_bytes
        unit_index = 0
        while size >= 1024 and unit_index < len(units) - 1:
            size /= 1024
            unit_index += 1
        return f"{size:.2f} {units[unit_index]}"


if __name__ == "__main__":
    root = tk.Tk()
    app = ImageServerApp(root)
    root.mainloop()

Python Tkinter 与 Socket 实现图片传输应用

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
-江鹤鸣川-的头像 - 鹿快
评论 抢沙发

请登录后发表评论

    暂无评论内容