事件
主站和从站事件通过 add_* 方法注册回调函数,通过 remove_* 方法移除。
主站级事件直接通过 master.on_* 方法注册底层回调。
使用 MasterEvents 和 SlaveEvents 类进行更精细的事件管理,支持多监听器和自动路由。
所有事件(PDO 周期回调除外)触发时系统均自动记录日志,无论是否订阅。确保关键运行状态不会被静默丢失。
PDO 周期回调(on_pdo_cycle(sync=True/False))为高频回调,不记录日志。
功能概览
| 类别 | 方法 | 说明 | 自动日志 |
|---|---|---|---|
| PDO 周期回调 | master.on_pdo_cycle() | PDO 周期回调(同步/异步) | — |
| 状态事件 | master.on_state_change() | 从站状态变化 | ✓ |
| 热插拔事件 | master.on_slave_discovery() | 从站发现/丢失 | ✓ |
| master.events.add_slave_identity_mismatch() | 从站身份不符(换成错误型号/低版本) | ✓ | |
| 异常事件 | master.on_emergency() | CoE Emergency 紧急消息 | ✓ |
| master.on_pdo_frame_loss() | PDO 连续丢帧(按组独立跟踪) | ✓ | |
| master.set_dc_sync_lost_callback() | DC 同步丢失 | ✓ | |
| 输入数据变化 | master.on_input_changed() | 从站输入 PDO 数据变化(原生检测) | — |
| 冗余事件 | master.on_redundancy_mode_changed() | 冗余模式变化 | ✓ |
| master.events.add_slave_port_link_changed() | 从站端口 link 变化(P0-P3 断开/恢复) | ✓ | |
| 日志 | master.set_log_callback() | 日志数据更新通知 | — |
PDO 周期回调
on_pdo_cycle()
def on_pdo_cycle(self, callback: Callable[[int], None], sync: bool = True) -> None
注册 PDO 周期回调。每个 PDO 周期触发一次。
参数:
callback— 回调函数(master_index)sync(bool) —True=同步回调(在 PDO 线程中直接执行,延迟最低但必须快速返回),False=异步回调(不阻塞实时 PDO 线程,推荐)
示例:
# 异步回调(推荐,不阻塞 PDO 线程)
def on_pdo(master_index):
status = master[1].coe.sdo_read_value(0x6041, 0, dtype='u16')
master.on_pdo_cycle(on_pdo, sync=False)
# 同步回调(在 PDO 线程中直接执行)
def on_pdo_sync(master_index):
input_ref = slave.pdo.bind_pdo_struct(ServoInput, is_input=True)
output_ref = slave.pdo.bind_pdo_struct(ServoOutput, is_input=False)
if output_ref:
output_ref.control_word = 0x000F
master.on_pdo_cycle(on_pdo_sync, sync=True)
同步回调中不要执行耗时操作(如 SDO 读写、文件 I/O、锁等待),否则会阻塞实时线程导致丢帧。
状态事件
on_state_change()
def on_state_change(self, callback: Callable[[int, int, int, int], None],
sync: bool = True) -> None
注册从站状态变化回调。由后台线程监控并回调。始终自动记录日志。
回调参数:
master_index(int) — 主站索引slave_index(int) — 从站索引(1-based)old_state(int) — 变化前的状态(EcState枚举值)new_state(int) — 变化后的状态
示例:
from ethercat import EcState
def on_state_change(mi, si, old, new):
print(f"从站 {si}: 0x{old:02X} -> 0x{new:02X}")
slave = master[si]
if slave.error_code != 0:
print(f" 错误码: 0x{slave.error_code:04X}")
master.on_state_change(on_state_change)
slave.error_code 读取 AL Status Code,返回状态切换失败的具体原因。
热插拔事件
on_slave_discovery()
def on_slave_discovery(self, callback: Callable[[int, int, bool], None],
sync: bool = True) -> None
注册从站发现/丢失回调。热插拔断开时触发,PDO 线程内置的恢复状态机会自动恢复从站(零 PDO 影响)。始终自动记录日志。
回调参数:
master_index(int) — 主站索引slave_index(int) — 从站索引(1-based)is_found(bool) —True=从站上线,False=从站离线
示例:
def on_discovery(mi, si, found):
if found:
print(f"从站 {si} 上线")
else:
print(f"从站 {si} 离线")
master.on_slave_discovery(on_discovery)
is_slave_offline() (MasterEvents)
def is_slave_offline(self, slave_index: int) -> bool
查询从站是否已被事件系统确认为离线状态。需通过 MasterEvents 实例访问。
参数:
slave_index(int) — 从站索引(1-based)
返回值:
bool— 从站离线返回True
示例:
from ethercat import MasterEvents
events = MasterEvents(master)
# ... 注册回调 ...
if events.is_slave_offline(2):
print("从站 2 处于离线状态")
# 获取所有离线从站集合
offline = events.offline_slaves # Set[int]
print(f"离线从站: {offline}")
slave_identity_mismatch (MasterEvents)
def add_slave_identity_mismatch(self,
callback: Callable[[SlaveIdentityMismatchEventArgs], None]) -> None
从站断电重插后身份不符时触发:EtherCAT 识别状态机读取到的 Vendor/Product 与配置不匹配,或 Revision 低于配置(向后兼容:实际 Revision ≥ 配置值视为匹配)。始终自动记录日志。
触发后从站进入 IDENT_REJECTED 状态,不会自动重探测(防止错设备反复刷屏)。操作员检查/更换设备后需调用 master.acknowledge_slave_replacement() 让 SDK 重新探测。
相关结构:
@dataclass
class SlaveIdentityMismatchEventArgs:
master_index: int # 主站索引
slave_index: int # 从站索引(1-based)
expected_vendor: int # 配置期望的厂商 ID
expected_product: int # 配置期望的产品代码
expected_revision: int # 配置期望的最低修订号
actual_vendor: int # 当前实际厂商 ID
actual_product: int # 当前实际产品代码
actual_revision: int # 当前实际修订号
示例:
from ethercat import MasterEvents, SlaveIdentityMismatchEventArgs
events = MasterEvents(master)
def on_identity_mismatch(args: SlaveIdentityMismatchEventArgs):
print(f"从站 {args.slave_index} 身份不符:")
print(f" 期望: Vendor=0x{args.expected_vendor:08X}, "
f"Product=0x{args.expected_product:08X}, Rev>=0x{args.expected_revision:08X}")
print(f" 实际: Vendor=0x{args.actual_vendor:08X}, "
f"Product=0x{args.actual_product:08X}, Rev=0x{args.actual_revision:08X}")
# UI 弹窗提示用户换回正确设备, 确认后调用:
# master.acknowledge_slave_replacement(args.slave_index)
events.add_slave_identity_mismatch(on_identity_mismatch)
同一从站进入 IDENT_REJECTED 状态仅触发一次事件。调用 acknowledge_slave_replacement() 后重置探测,身份仍不匹配会再次触发。
异常事件
on_emergency()
def on_emergency(self, callback: Callable) -> None
注册 CoE Emergency 紧急消息回调。从站固件检测到错误时发送,数据格式遵循 CANopen Emergency 协议(CiA 301)。始终自动记录日志。
回调参数:
(master_index, slave_index, error_code, error_reg, b1, w1, w2)
master_index(int) — 主站索引slave_index(int) — 从站索引(1-based)error_code(int) — 错误代码(CANopen Emergency Error Code,对象 0x603F)error_reg(int) — 错误寄存器(对象 0x1001)b1(int) — 制造商特定数据(字节 3)w1(int) — 制造商特定数据(字节 4-5)w2(int) — 制造商特定数据(字节 6-7)
常见 Emergency Error Code:
0x0000— 错误已复位0x1000— 通用错误0x2000— 电流错误0x3000— 电压错误0x4000— 温度错误0x5000— 设备硬件错误0x6000— 设备软件错误0x7000— 附加模块错误0x8000— 监控错误(通信)0xFF00— 制造商特定错误
示例:
def on_emcy(mi, si, err_code, err_reg, b1, w1, w2):
print(f"从站 {si} 紧急消息: 错误码=0x{err_code:04X}, 寄存器=0x{err_reg:02X}")
master.on_emergency(on_emcy)
on_pdo_frame_loss()
def on_pdo_frame_loss(self, callback: Callable[[int, int, int, int], None]) -> None
注册 PDO 连续丢帧回调。当连续丢帧达到阈值时触发,单次丢帧仅计数不触发。每组独立跟踪。始终自动记录日志。
回调参数:
(master_index, group, consecutive_lost, total_lost)
master_index(int) — 主站索引group(int) — 发生丢帧的组号(0-7),对应从站分组consecutive_lost(int) — 该组连续丢帧数total_lost(int) — 该组累计丢帧数
示例:
def on_loss(mi, group, consecutive, total):
print(f"组 {group} 丢帧: 连续={consecutive}, 累计={total}")
master.on_pdo_frame_loss(on_loss)
PDO 丢帧的详细统计(按组查询)请参考 主站诊断 - PDO 丢帧。
set_dc_sync_lost_callback()
def set_dc_sync_lost_callback(self, callback: Optional[Callable[[int, int, int], None]]) -> None
设置 DC 同步丢失回调。当从站从同步->失同步(超出阈值)时触发一次,持续超出不重复触发。传入 None 清除回调。始终自动记录日志。
回调参数:
(master_index, slave_index, diff_ns)
master_index(int) — 主站索引slave_index(int) — 失同步的从站索引(1-based)diff_ns(int) — 当前与参考时钟的时间差(纳秒)
示例:
master.diagnostics_info.sync_window_threshold = 500 # 500ns
def on_sync_lost(mi, si, diff_ns):
print(f"从站 {si} DC 同步丢失: 偏差 {diff_ns}ns")
master.set_dc_sync_lost_callback(on_sync_lost)
slave.diagnostics.dc.is_in_sync 和 slave.diagnostics.dc.sync_time_difference 可查询从站当前同步状态。详见 从站诊断 - DC 同步。
输入数据变化事件
on_input_changed()
def on_input_changed(self, callback: Callable[[int, List[int]], None]) -> None
注册输入 PDO 数据变化回调。仅当从站输入数据与上一周期不同时回调。
- 无变化时: 不触发回调,零开销
- 有变化时: 仅触发变化从站的回调
- 检测精度: 逐字节比较,任何一个 bit 变化都能检测到
回调参数:
master_index(int) — 主站索引changed_slave_indices(List[int]) — 数据变化的从站索引列表
示例:
def on_input_changed(mi, changed_slaves):
for si in changed_slaves:
slave = master[si]
raw = slave.pdo.inputs
if raw:
print(f"从站 {si} 输入变化: {raw[:4].hex()}")
master.on_input_changed(on_input_changed)
slave.events.add_input_changed()— 仅需知道"变了",自行读取数据master.on_input_changed()— 统一监控所有从站变化master.on_pdo_cycle(sync=True)— 每周期都需要处理数据
详见 从站事件。
on_input_changed 在 on_pdo_cycle(sync=True) 之前触发。这意味着在同步回调中可以确信变化事件已经分发完毕。两者可以同时使用,互不影响。
冗余事件
on_redundancy_mode_changed()
def on_redundancy_mode_changed(self, callback: Callable[[int, int, int], None]) -> None
冗余运行模式发生变化时触发。old_mode / new_mode 对应 RingMode 枚举值(0=Inactive, 1=Dual, 2=Degraded)。
回调参数:
(master_index, old_mode, new_mode)
示例:
def on_red_change(mi, old, new):
modes = {0: "Inactive", 1: "Dual", 2: "Degraded"}
print(f"冗余模式: {modes.get(old)} -> {modes.get(new)}")
master.on_redundancy_mode_changed(on_red_change)
slave_port_link_changed (MasterEvents)
def add_slave_port_link_changed(self,
callback: Callable[[int, int, int, bool], None]) -> None
从站 ESC 端口物理链路变化时触发(P0-P3 之一断开或恢复)。每秒诊断周期检测 DL Status 寄存器的 link bit,从 1→0 触发"断开",从 0→1 触发"恢复"。始终自动记录日志。
用于精确定位故障线缆段(相邻从站的对向端口同时报断,说明中间线缆有问题)。
回调参数:
master_index(int) — 主站索引slave_index(int) — 从站索引(1-based)port(int) — 端口号0-3,对应P0/P1/P2/P3is_up(bool) —True=link 恢复,False=link 断开
示例:
from ethercat import MasterEvents
events = MasterEvents(master)
def on_port_link(mi, si, port, is_up):
action = "恢复" if is_up else "断开"
print(f"从站 {si} 端口 P{port} link {action}")
events.add_slave_port_link_changed(on_port_link)
master.diagnostics.break_point 提供聚合后的故障点视图(断线 + CRC 故障)。slave_port_link_changed 是原始事件源,适合做实时告警。详见 主站诊断 - 冗余状态。
日志回调
set_log_callback()
def set_log_callback(self, callback: Callable[[int, str], None]) -> None
设置日志回调。
回调参数:
category(int) — 日志类别message(str) — 日志消息
示例:
def on_log(category, message):
labels = {0: "错误", 1: "警告", 2: "消息", 3: "邮箱", 4: "PDO", 5: "调试"}
print(f"[{labels.get(category, str(category))}] {message}")
master.set_log_callback(on_log)
高级事件系统 (MasterEvents)
MasterEvents 类提供多监听器支持和从站级事件自动路由。所有事件回调统一注册,MasterEvents 自动分发到对应的监听器。
from ethercat import MasterEvents
events = MasterEvents(master)
# === PDO 周期回调 ===
events.add_pdo_cyclic_async(lambda mi: None) # 异步回调(推荐)
events.add_pdo_cyclic_sync(lambda mi: None) # 同步回调
# === 主站状态变化 ===
events.add_state_changed(lambda old, new:
print(f"主站: 0x{old:02X} -> 0x{new:02X}"))
# === 从站状态变化 ===
events.add_slave_state_changed(lambda mi, si, old, new:
print(f"从站 {si}: 0x{old:02X} -> 0x{new:02X}"))
# === 热插拔 ===
events.add_slave_offline(lambda si: print(f"从站 {si} 离线"))
events.add_slave_online(lambda si: print(f"从站 {si} 上线"))
# === 异常事件 ===
events.add_pdo_frame_loss(lambda mi, grp, cons, total:
print(f"组 {grp} 丢帧: {cons}"))
events.add_emergency(lambda mi, si, ec, er, b1, w1, w2:
print(f"从站 {si} 紧急: 0x{ec:04X}"))
events.add_dc_sync_lost(lambda mi, si, diff:
print(f"从站 {si} DC 同步丢失: {diff}ns"))
# === 输入数据变化 ===
events.add_input_data_changed(lambda mi, si:
print(f"从站 {si} 输入数据变化"))
# === 冗余 ===
events.add_redundancy_mode_changed(lambda mi, old, new:
print(f"冗余: {old} -> {new}"))
# === 从站端口链路变化 (断线 / 恢复) ===
events.add_slave_port_link_changed(lambda mi, si, port, is_up:
print(f"从站 {si} P{port} {'恢复' if is_up else '断开'}"))
# === 从站身份不符 (热插拔自修复) ===
events.add_slave_identity_mismatch(lambda args:
print(f"从站 {args.slave_index} 身份不符, 实际 Vendor=0x{args.actual_vendor:08X}"))
# === 查询从站离线状态 ===
if events.is_slave_offline(2):
print("从站 2 处于离线状态")
# === 移除监听器 ===
events.remove_slave_offline(my_callback)
clear_all() (MasterEvents)
def clear_all(self) -> None
清除所有事件订阅,防止内存泄漏。一次性移除所有通过 add_* 注册的回调,同时清空离线从站跟踪集合。
示例:
events = MasterEvents(master)
events.add_slave_offline(lambda si: print(f"从站 {si} 离线"))
events.add_emergency(lambda mi, si, ec, er, b1, w1, w2: None)
# 清除所有订阅
events.clear_all()
从站级事件 (SlaveEvents)
从站级事件通过 slave.events 访问(需要先通过 master[n] 获取从站实例)。事件参数已自动过滤到当前从站,不含 master_index / slave_index。
slave = master[1]
# 注册从站状态变化
slave.events.add_state_changed(lambda old, new:
print(f"状态: 0x{old:02X} -> 0x{new:02X}"))
# 紧急消息 (error_code, error_reg, b1, w1, w2)
slave.events.add_emergency(lambda ec, er, b1, w1, w2:
print(f"紧急: 0x{ec:04X}"))
# 离线/上线
slave.events.add_offline(lambda: print("从站离线"))
slave.events.add_online(lambda: print("从站上线"))
# DC 同步丢失 (diff_ns)
slave.events.add_dc_sync_lost(lambda diff: print(f"DC 同步丢失: {diff}ns"))
# 输入数据变化 (无参数)
slave.events.add_input_changed(lambda: print("输入数据变化"))
# 移除监听器
slave.events.remove_state_changed(my_callback)
slave.events.remove_emergency(my_callback)
slave.events.remove_offline(my_callback)
slave.events.remove_online(my_callback)
slave.events.remove_dc_sync_lost(my_callback)
slave.events.remove_input_changed(my_callback)
clear_all() (SlaveEvents)
def clear_all(self) -> None
清除该从站的所有事件订阅(包括 FSoE 事件),防止内存泄漏。
示例:
slave = master[1]
slave.events.add_state_changed(lambda old, new: None)
slave.events.add_emergency(lambda ec, er, b1, w1, w2: None)
# 清除该从站的所有订阅
slave.events.clear_all()
线程安全
事件回调在非 UI 线程上触发。如需更新 GUI 或在主线程处理数据,需使用对应的线程同步机制:
import queue
# 使用 queue 跨线程传递事件(推荐)
event_queue = queue.Queue()
def on_discovery(mi, si, found):
event_queue.put(("discovery", si, found))
master.on_slave_discovery(on_discovery)
# 主线程消费
while True:
event_type, idx, found = event_queue.get()
print(f"从站 {idx} {'上线' if found else '离线'}")
Python GIL 确保回调是线程安全的,但要注意:
- 同步 PDO 回调(
sync=True)在底层实时线程中执行,必须快速返回 - 不要在同步回调中进行 I/O 操作、加锁或 SDO 读写
- 推荐使用异步回调(
sync=False)或queue.Queue跨线程传递数据
完整示例
from ethercat import EtherCATMaster, EcState
with EtherCATMaster() as master:
# ===== 日志 =====
master.set_log_callback(lambda cat, msg: print(f"[{cat}] {msg}"))
# ===== 状态变化 =====
master.on_state_change(lambda mi, si, old, new:
print(f"从站 {si}: 0x{old:02X} -> 0x{new:02X}"))
# ===== 紧急消息 =====
master.on_emergency(lambda mi, si, ec, er, b1, w1, w2:
print(f"从站 {si} 紧急: 0x{ec:04X}"))
# ===== 从站发现/丢失 =====
master.on_slave_discovery(lambda mi, si, found:
print(f"从站 {si} {'上线' if found else '离线'}"))
# ===== PDO 丢帧 =====
master.on_pdo_frame_loss(lambda mi, grp, cons, total:
print(f"组 {grp} 丢帧: 连续={cons}, 累计={total}"))
# ===== DC 同步丢失 =====
master.set_dc_sync_lost_callback(lambda mi, si, diff:
print(f"从站 {si} DC 同步丢失: {diff}ns"))
# ===== 输入数据变化 =====
master.on_input_changed(lambda mi, changed:
print(f"输入变化: {changed}"))
# ===== 冗余模式变化 =====
master.on_redundancy_mode_changed(lambda mi, old, new:
print(f"冗余: {old} -> {new}"))
# ===== PDO 周期回调(异步) =====
master.on_pdo_cycle(lambda mi: None, sync=False)
# 连接和运行
master.set_network(r"\\Device\\NPF_{GUID}")
master.set_state(EcState.OP)
master.start()