跳到主要内容

FoE (File over EtherCAT)

FoE 协议用于 EtherCAT 从站的文件传输,支持固件更新、配置文件上传下载等场景。

通过 slave.foe 访问。从站不支持 FoE 时为 None

属性

属性类型访问说明
default_timeout_msint读写默认超时时间(毫秒),默认 30000(30秒)
default_passwordint读写默认密码,默认 0

文件传输

read()

def read(self, filename: str, password: int = 0) -> Optional[bytes]

从从站设备读取文件。

参数:

  • filename (str) — 要读取的文件名
  • password (int) — FoE 密码,默认 0

返回值:

  • bytes | None — 文件内容字节数组,失败返回 None

示例:

firmware = slave.foe.read("firmware.bin")
config = slave.foe.read("config.xml", password=0x1234)

write()

def write(self, filename: str, data: bytes, password: int = 0) -> bool

向从站设备写入文件。

示例:

with open("new_firmware.bin", "rb") as f:
new_firmware = f.read()
success = slave.foe.write("firmware.bin", new_firmware)

传输进度

set_progress_callback()

def set_progress_callback(self, callback: Optional[Callable[[int, int, int], None]]) -> bool

设置 FoE 传输进度回调。传入 None 清除回调。

回调参数:

  • slave_index (int) — 从站编号
  • packet_number (int) — 数据包序号
  • data_size (int) — 数据大小(字节)

示例:

def on_progress(slave_index, packet_number, data_size):
print(f"\r传输进度: 包 {packet_number}, 大小 {data_size}", end="")

slave.foe.set_progress_callback(on_progress)

enable_progress_hook()

def enable_progress_hook(self, estimated_total_packets: int = 0) -> bool

启用 FoE 进度钩子(带百分比计算)。启用后,download / upload 操作会自动触发进度报告,回调参数增加 progress_percent

参数:

  • estimated_total_packets (int) — 估算的总数据包数(用于计算进度百分比,0 = 不估算)

disable_progress_hook()

def disable_progress_hook(self) -> bool

禁用 FoE 进度钩子。

estimate_packet_count()

@staticmethod
def estimate_packet_count(file_size: int, mailbox_size: int = 512) -> int

估算文件传输所需的数据包数量。

参数:

  • file_size (int) — 文件大小(字节)
  • mailbox_size (int) — 邮箱大小(字节,默认 512)

示例:

from ethercat.slave.foe import FoE

# 使用带百分比的进度回调
def on_progress(slave_index, packet_number, data_size, progress_percent):
print(f"\r传输进度: {progress_percent}% (包 {packet_number})", end="")

slave.foe._on_progress = on_progress

with open("new_firmware.bin", "rb") as f:
firmware = f.read()

packets = FoE.estimate_packet_count(len(firmware))
slave.foe.enable_progress_hook(packets)

ok = slave.foe.upload("firmware.bin", firmware)
slave.foe.disable_progress_hook()
print("\n上传完成" if ok else "\n上传失败")

错误处理

FoEErrorCode 枚举

FoE 协议定义的错误码,用于诊断传输失败原因。

class FoEErrorCode(IntEnum):
NotDefined = 0x8000 # 未定义错误
NotFound = 0x8001 # 文件未找到
AccessDenied = 0x8002 # 访问被拒绝
DiskFull = 0x8003 # 磁盘已满
Illegal = 0x8004 # 非法操作
PacketNumberWrong = 0x8005 # 数据包序号错误
AlreadyExists = 0x8006 # 文件已存在
NoUser = 0x8007 # 无此用户
BootstrapOnly = 0x8008 # 仅 Bootstrap 模式可用
NotBootstrap = 0x8009 # 不在 Bootstrap 模式
NoRights = 0x800A # 权限不足
ProgramError = 0x800B # 程序错误

get_error_description()

@staticmethod
def get_error_description(error_code: FoEErrorCode) -> str

获取 FoE 错误码的中文描述文本。

last_error_code 属性

@property
def last_error_code(self) -> FoEErrorCode

最近一次 FoE 操作的错误码。当 download / upload / read / write 失败时,此属性包含具体的错误码。

示例:

from ethercat.data.types import FoEErrorCode
from ethercat.slave.foe import FoE

data = slave.foe.download("firmware.bin")
if data is None:
error = slave.foe.last_error_code
desc = FoE.get_error_description(error)
print(f"下载失败: {desc}")

完整示例

固件更新

if slave.foe is not None:
# 备份当前固件
backup = slave.foe.read("firmware.bin")
if backup is not None:
with open("firmware_backup.bin", "wb") as f:
f.write(backup)

# 上传新固件(带进度回调)
def on_progress(s, pkt, sz):
print(f"\r更新进度: 包 {pkt}, 大小 {sz}", end="")
slave.foe.set_progress_callback(on_progress)

with open("new_firmware.bin", "rb") as f:
new_firmware = f.read()
ok = slave.foe.write("firmware.bin", new_firmware)
print("\n固件更新成功" if ok else "\n固件更新失败")

配置文件传输

if slave.foe is not None:
slave.foe.default_password = 0x12345678
slave.foe.default_timeout_ms = 10000 # 10秒

# 读取配置
config = slave.foe.read("config.xml")
if config is not None:
xml_content = config.decode("utf-8")
print(f"配置内容:\n{xml_content}")

# 写入新配置
new_config = '<Config><Param1>100</Param1></Config>'
config_bytes = new_config.encode("utf-8")
slave.foe.write("config.xml", config_bytes)

取消 / BUSY Hook

固件升级常达分钟级, SDK 提供两组互补 API 处理长时间传输:

  • cancel / clear_cancel — 主动中止进行中的 FoE 传输
  • set_busy_hook — 从站处于 Flash 烧写阶段时, 定期接收 BUSY 帧并回调 (Done/Entire/文本)
set_progress_callback 的区别
  • set_progress_callback — 每个数据包触发, 反映传输进度
  • set_busy_hook — 从站 Flash 烧写期间触发, 反映设备端处理进度 (ETG.1000.6 Table 93 BUSY 帧)

FoEBusyEvent

@dataclass
class FoEBusyEvent:
slave_index: int = 0 # 从站索引
done: int = 0 # 已完成量
entire: int = 0 # 总量
text: str = "" # 从站返回的可选描述文本
retry_index: int = 0 # BUSY 帧重试序号

@property
def percent(self) -> float
"""完成百分比 (0..100). entire=0 时返回 0."""

cancel()

def cancel(self) -> bool

请求取消进行中的 FoE 传输。设置底层取消标志,当前正在等待的邮箱帧结束后 (下一次循环迭代) 主动中止传输。

返回值:

  • bool — 调用是否成功

clear_cancel()

def clear_cancel(self) -> bool

清除 FoE 取消标志 (异常复位用)。

通常不需手动调用

read / write / download / upload 入口会自动清零取消标志, 仅在异常 / 强制重置场景使用。

set_busy_hook()

def set_busy_hook(self,
callback: Optional[Callable[[FoEBusyEvent], None]]
) -> bool

设置 FoE BUSY 回调,接收下载/上传过程中的进度事件。

参数:

  • callback — 回调函数 fn(FoEBusyEvent); 传入 None 清除 hook

返回值:

  • bool — 调用是否成功
回调在底层邮箱线程同步触发

应快速返回, 避免阻塞后续 BUSY 帧处理。

示例 — 同步取消:

def on_busy(e: FoEBusyEvent):
print(f"\r{e.percent:.1f}% {e.text} (retry={e.retry_index})", end="")

slave.foe.set_busy_hook(on_busy)

# 启动下载 (新线程以便可被取消)
import threading
result = [None]
def worker():
result[0] = slave.foe.write("firmware.bin", firmware_bytes)
t = threading.Thread(target=worker, daemon=True)
t.start()

# 用户决定取消
import time
time.sleep(5)
slave.foe.cancel() # 下一次循环起生效
t.join()
print("\n取消成功" if not result[0] else "\n传输完成")

示例 — asyncio 超时取消:

import asyncio

def on_busy(e: FoEBusyEvent):
print(f"{e.percent:.1f}% {e.text}")

slave.foe.set_busy_hook(on_busy)

async def download_firmware():
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
None, slave.foe.write, "firmware.bin", firmware_bytes
)

task = asyncio.create_task(download_firmware())
try:
ok = await asyncio.wait_for(task, timeout=60.0)
except asyncio.TimeoutError:
slave.foe.cancel()
ok = False
print("FoE 下载超时, 已请求取消")

示例 — 清除 hook:

# 完成后建议清除, 避免后续传输继续触发回调
slave.foe.set_busy_hook(None)