FoE (File over EtherCAT)
FoE 协议用于 EtherCAT 从站的文件传输,支持固件更新、配置文件上传下载等场景。
通过 slave.foe 访问。从站不支持 FoE 时为 None。
属性
| 属性 | 类型 | 访问 | 说明 |
|---|---|---|---|
| default_timeout_ms | int | 读写 | 默认超时时间(毫秒),默认 30000(30秒) |
| default_password | int | 读写 | 默认密码,默认 0 |
文件传输
read()
def read(self, filename: str, password: int = 0) -> Optional[bytes]
从从站设备读取文件。
参数:
filename(str) — 要读取的文件名password(int) — FoE 密码,默认 0
返回值:
bytes | None— 文件内容字节数组,失败返回None
示例:
firmware = slave.foe.read("firmware.bin")
config = slave.foe.read("config.xml", password=0x1234)
write()
def write(self, filename: str, data: bytes, password: int = 0) -> bool
向从站设备写入文件。
示例:
with open("new_firmware.bin", "rb") as f:
new_firmware = f.read()
success = slave.foe.write("firmware.bin", new_firmware)
传输进度
set_progress_callback()
def set_progress_callback(self, callback: Optional[Callable[[int, int, int], None]]) -> bool
设置 FoE 传输进度回调。传入 None 清除回调。
回调参数:
slave_index(int) — 从站编号packet_number(int) — 数据包序号data_size(int) — 数据大小(字节)
示例:
def on_progress(slave_index, packet_number, data_size):
print(f"\r传输进度: 包 {packet_number}, 大小 {data_size}", end="")
slave.foe.set_progress_callback(on_progress)
enable_progress_hook()
def enable_progress_hook(self, estimated_total_packets: int = 0) -> bool
启用 FoE 进度钩子(带百分比计算)。启用后,download / upload 操作会自动触发进度报告,回调参数增加 progress_percent。
参数:
estimated_total_packets(int) — 估算的总数据包数(用于计算进度百分比,0 = 不估算)
disable_progress_hook()
def disable_progress_hook(self) -> bool
禁用 FoE 进度钩子。
estimate_packet_count()
@staticmethod
def estimate_packet_count(file_size: int, mailbox_size: int = 512) -> int
估算文件传输所需的数据包数量。
参数:
file_size(int) — 文件大小(字节)mailbox_size(int) — 邮箱大小(字节,默认 512)
示例:
from ethercat.slave.foe import FoE
# 使用带百分比的进度回调
def on_progress(slave_index, packet_number, data_size, progress_percent):
print(f"\r传输进度: {progress_percent}% (包 {packet_number})", end="")
slave.foe._on_progress = on_progress
with open("new_firmware.bin", "rb") as f:
firmware = f.read()
packets = FoE.estimate_packet_count(len(firmware))
slave.foe.enable_progress_hook(packets)
ok = slave.foe.upload("firmware.bin", firmware)
slave.foe.disable_progress_hook()
print("\n上传完成" if ok else "\n上传失败")
错误处理
FoEErrorCode 枚举
FoE 协议定义的错误码,用于诊断传输失败原因。
class FoEErrorCode(IntEnum):
NotDefined = 0x8000 # 未定义错误
NotFound = 0x8001 # 文件未找到
AccessDenied = 0x8002 # 访问被拒绝
DiskFull = 0x8003 # 磁盘已满
Illegal = 0x8004 # 非法操作
PacketNumberWrong = 0x8005 # 数据包序号错误
AlreadyExists = 0x8006 # 文件已存在
NoUser = 0x8007 # 无此用户
BootstrapOnly = 0x8008 # 仅 Bootstrap 模式可用
NotBootstrap = 0x8009 # 不在 Bootstrap 模式
NoRights = 0x800A # 权限不足
ProgramError = 0x800B # 程序错误
get_error_description()
@staticmethod
def get_error_description(error_code: FoEErrorCode) -> str
获取 FoE 错误码的中文描述文本。
last_error_code 属性
@property
def last_error_code(self) -> FoEErrorCode
最近一次 FoE 操作的错误码。当 download / upload / read / write 失败时,此属性包含具体的错误码。
示例:
from ethercat.data.types import FoEErrorCode
from ethercat.slave.foe import FoE
data = slave.foe.download("firmware.bin")
if data is None:
error = slave.foe.last_error_code
desc = FoE.get_error_description(error)
print(f"下载失败: {desc}")
完整示例
固件更新
if slave.foe is not None:
# 备份当前固件
backup = slave.foe.read("firmware.bin")
if backup is not None:
with open("firmware_backup.bin", "wb") as f:
f.write(backup)
# 上传新固件(带进度回调)
def on_progress(s, pkt, sz):
print(f"\r更新进度: 包 {pkt}, 大小 {sz}", end="")
slave.foe.set_progress_callback(on_progress)
with open("new_firmware.bin", "rb") as f:
new_firmware = f.read()
ok = slave.foe.write("firmware.bin", new_firmware)
print("\n固件更新成功" if ok else "\n固件更新失败")
配置文件传输
if slave.foe is not None:
slave.foe.default_password = 0x12345678
slave.foe.default_timeout_ms = 10000 # 10秒
# 读取配置
config = slave.foe.read("config.xml")
if config is not None:
xml_content = config.decode("utf-8")
print(f"配置内容:\n{xml_content}")
# 写入新配置
new_config = '<Config><Param1>100</Param1></Config>'
config_bytes = new_config.encode("utf-8")
slave.foe.write("config.xml", config_bytes)
取消 / BUSY Hook
固件升级常达分钟级, SDK 提供两组互补 API 处理长时间传输:
- cancel / clear_cancel — 主动中止进行中的 FoE 传输
- set_busy_hook — 从站处于 Flash 烧写阶段时, 定期接收 BUSY 帧并回调 (Done/Entire/文本)
与
set_progress_callback 的区别set_progress_callback— 每个数据包触发, 反映传输进度set_busy_hook— 从站 Flash 烧写期间触发, 反映设备端处理进度 (ETG.1000.6 Table 93 BUSY 帧)
FoEBusyEvent
@dataclass
class FoEBusyEvent:
slave_index: int = 0 # 从站索引
done: int = 0 # 已完成量
entire: int = 0 # 总量
text: str = "" # 从站返回的可选描述文本
retry_index: int = 0 # BUSY 帧重试序号
@property
def percent(self) -> float
"""完成百分比 (0..100). entire=0 时返回 0."""
cancel()
def cancel(self) -> bool
请求取消进行中的 FoE 传输。设置底层取消标志,当前正在等待的邮箱帧结束后 (下一次循环迭代) 主动中止传输。
返回值:
bool— 调用是否成功
clear_cancel()
def clear_cancel(self) -> bool
清除 FoE 取消标志 (异常复位用)。
通常不需手动调用
read / write / download / upload 入口会自动清零取消标志,
仅在异常 / 强制重置场景使用。
set_busy_hook()
def set_busy_hook(self,
callback: Optional[Callable[[FoEBusyEvent], None]]
) -> bool
设置 FoE BUSY 回调,接收下载/上传过程中的进度事件。
参数:
callback— 回调函数fn(FoEBusyEvent); 传入None清除 hook
返回值:
bool— 调用是否成功
回调在底层邮箱线程同步触发
应快速返回, 避免阻塞后续 BUSY 帧处理。
示例 — 同步取消:
def on_busy(e: FoEBusyEvent):
print(f"\r{e.percent:.1f}% {e.text} (retry={e.retry_index})", end="")
slave.foe.set_busy_hook(on_busy)
# 启动下载 (新线程以便可被取消)
import threading
result = [None]
def worker():
result[0] = slave.foe.write("firmware.bin", firmware_bytes)
t = threading.Thread(target=worker, daemon=True)
t.start()
# 用户决定取消
import time
time.sleep(5)
slave.foe.cancel() # 下一次循环起生效
t.join()
print("\n取消成功" if not result[0] else "\n传输完成")
示例 — asyncio 超时取消:
import asyncio
def on_busy(e: FoEBusyEvent):
print(f"{e.percent:.1f}% {e.text}")
slave.foe.set_busy_hook(on_busy)
async def download_firmware():
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
None, slave.foe.write, "firmware.bin", firmware_bytes
)
task = asyncio.create_task(download_firmware())
try:
ok = await asyncio.wait_for(task, timeout=60.0)
except asyncio.TimeoutError:
slave.foe.cancel()
ok = False
print("FoE 下载超时, 已请求取消")
示例 — 清除 hook:
# 完成后建议清除, 避免后续传输继续触发回调
slave.foe.set_busy_hook(None)