跳到主要内容

事件

主站和从站事件通过 add_* 方法注册回调函数,通过 remove_* 方法移除。

主站级回调

主站级事件直接通过 master.on_* 方法注册底层回调。

高级事件系统

使用 MasterEventsSlaveEvents 类进行更精细的事件管理,支持多监听器和自动路由。

自动日志

所有事件(PDO 周期回调除外)触发时系统均自动记录日志,无论是否订阅。确保关键运行状态不会被静默丢失。

PDO 周期回调(on_pdo_cycle(sync=True/False))为高频回调,不记录日志。

功能概览

类别方法说明自动日志
PDO 周期回调master.on_pdo_cycle()PDO 周期回调(同步/异步)
状态事件master.on_state_change()从站状态变化
热插拔事件master.on_slave_discovery()从站发现/丢失
master.events.add_slave_identity_mismatch()从站身份不符(换成错误型号/低版本)
异常事件master.on_emergency()CoE Emergency 紧急消息
master.on_pdo_frame_loss()PDO 连续丢帧(按组独立跟踪)
master.set_dc_sync_lost_callback()DC 同步丢失
输入数据变化master.on_input_changed()从站输入 PDO 数据变化(原生检测)
冗余事件master.on_redundancy_mode_changed()冗余模式变化
master.events.add_slave_port_link_changed()从站端口 link 变化(P0-P3 断开/恢复)
日志master.set_log_callback()日志数据更新通知

PDO 周期回调

on_pdo_cycle()

def on_pdo_cycle(self, callback: Callable[[int], None], sync: bool = True) -> None

注册 PDO 周期回调。每个 PDO 周期触发一次。

参数:

  • callback — 回调函数 (master_index)
  • sync (bool) — True=同步回调(在 PDO 线程中直接执行,延迟最低但必须快速返回),False=异步回调(不阻塞实时 PDO 线程,推荐)

示例:

# 异步回调(推荐,不阻塞 PDO 线程)
def on_pdo(master_index):
status = master[1].coe.sdo_read_value(0x6041, 0, dtype='u16')

master.on_pdo_cycle(on_pdo, sync=False)

# 同步回调(在 PDO 线程中直接执行)
def on_pdo_sync(master_index):
input_ref = slave.pdo.bind_pdo_struct(ServoInput, is_input=True)
output_ref = slave.pdo.bind_pdo_struct(ServoOutput, is_input=False)
if output_ref:
output_ref.control_word = 0x000F

master.on_pdo_cycle(on_pdo_sync, sync=True)
注意

同步回调中不要执行耗时操作(如 SDO 读写、文件 I/O、锁等待),否则会阻塞实时线程导致丢帧。

状态事件

on_state_change()

def on_state_change(self, callback: Callable[[int, int, int, int], None],
sync: bool = True) -> None

注册从站状态变化回调。由后台线程监控并回调。始终自动记录日志。

回调参数:

  • master_index (int) — 主站索引
  • slave_index (int) — 从站索引(1-based)
  • old_state (int) — 变化前的状态(EcState 枚举值)
  • new_state (int) — 变化后的状态

示例:

from ethercat import EcState

def on_state_change(mi, si, old, new):
print(f"从站 {si}: 0x{old:02X} -> 0x{new:02X}")

slave = master[si]
if slave.error_code != 0:
print(f" 错误码: 0x{slave.error_code:04X}")

master.on_state_change(on_state_change)
error_code

slave.error_code 读取 AL Status Code,返回状态切换失败的具体原因。

热插拔事件

on_slave_discovery()

def on_slave_discovery(self, callback: Callable[[int, int, bool], None],
sync: bool = True) -> None

注册从站发现/丢失回调。热插拔断开时触发,PDO 线程内置的恢复状态机会自动恢复从站(零 PDO 影响)。始终自动记录日志。

回调参数:

  • master_index (int) — 主站索引
  • slave_index (int) — 从站索引(1-based)
  • is_found (bool) — True=从站上线,False=从站离线

示例:

def on_discovery(mi, si, found):
if found:
print(f"从站 {si} 上线")
else:
print(f"从站 {si} 离线")

master.on_slave_discovery(on_discovery)

is_slave_offline() (MasterEvents)

def is_slave_offline(self, slave_index: int) -> bool

查询从站是否已被事件系统确认为离线状态。需通过 MasterEvents 实例访问。

参数:

  • slave_index (int) — 从站索引(1-based)

返回值:

  • bool — 从站离线返回 True

示例:

from ethercat import MasterEvents

events = MasterEvents(master)
# ... 注册回调 ...

if events.is_slave_offline(2):
print("从站 2 处于离线状态")

# 获取所有离线从站集合
offline = events.offline_slaves # Set[int]
print(f"离线从站: {offline}")

slave_identity_mismatch (MasterEvents)

def add_slave_identity_mismatch(self,
callback: Callable[[SlaveIdentityMismatchEventArgs], None]) -> None

从站断电重插后身份不符时触发:EtherCAT 识别状态机读取到的 Vendor/Product 与配置不匹配,或 Revision 低于配置(向后兼容:实际 Revision ≥ 配置值视为匹配)。始终自动记录日志。

触发后从站进入 IDENT_REJECTED 状态,不会自动重探测(防止错设备反复刷屏)。操作员检查/更换设备后需调用 master.acknowledge_slave_replacement() 让 SDK 重新探测。

相关结构:

@dataclass
class SlaveIdentityMismatchEventArgs:
master_index: int # 主站索引
slave_index: int # 从站索引(1-based)
expected_vendor: int # 配置期望的厂商 ID
expected_product: int # 配置期望的产品代码
expected_revision: int # 配置期望的最低修订号
actual_vendor: int # 当前实际厂商 ID
actual_product: int # 当前实际产品代码
actual_revision: int # 当前实际修订号

示例:

from ethercat import MasterEvents, SlaveIdentityMismatchEventArgs

events = MasterEvents(master)

def on_identity_mismatch(args: SlaveIdentityMismatchEventArgs):
print(f"从站 {args.slave_index} 身份不符:")
print(f" 期望: Vendor=0x{args.expected_vendor:08X}, "
f"Product=0x{args.expected_product:08X}, Rev>=0x{args.expected_revision:08X}")
print(f" 实际: Vendor=0x{args.actual_vendor:08X}, "
f"Product=0x{args.actual_product:08X}, Rev=0x{args.actual_revision:08X}")
# UI 弹窗提示用户换回正确设备, 确认后调用:
# master.acknowledge_slave_replacement(args.slave_index)

events.add_slave_identity_mismatch(on_identity_mismatch)
去重机制

同一从站进入 IDENT_REJECTED 状态仅触发一次事件。调用 acknowledge_slave_replacement() 后重置探测,身份仍不匹配会再次触发。

异常事件

on_emergency()

def on_emergency(self, callback: Callable) -> None

注册 CoE Emergency 紧急消息回调。从站固件检测到错误时发送,数据格式遵循 CANopen Emergency 协议(CiA 301)。始终自动记录日志。

回调参数: (master_index, slave_index, error_code, error_reg, b1, w1, w2)

  • master_index (int) — 主站索引
  • slave_index (int) — 从站索引(1-based)
  • error_code (int) — 错误代码(CANopen Emergency Error Code,对象 0x603F)
  • error_reg (int) — 错误寄存器(对象 0x1001)
  • b1 (int) — 制造商特定数据(字节 3)
  • w1 (int) — 制造商特定数据(字节 4-5)
  • w2 (int) — 制造商特定数据(字节 6-7)

常见 Emergency Error Code:

  • 0x0000 — 错误已复位
  • 0x1000 — 通用错误
  • 0x2000 — 电流错误
  • 0x3000 — 电压错误
  • 0x4000 — 温度错误
  • 0x5000 — 设备硬件错误
  • 0x6000 — 设备软件错误
  • 0x7000 — 附加模块错误
  • 0x8000 — 监控错误(通信)
  • 0xFF00 — 制造商特定错误

示例:

def on_emcy(mi, si, err_code, err_reg, b1, w1, w2):
print(f"从站 {si} 紧急消息: 错误码=0x{err_code:04X}, 寄存器=0x{err_reg:02X}")

master.on_emergency(on_emcy)

on_pdo_frame_loss()

def on_pdo_frame_loss(self, callback: Callable[[int, int, int, int], None]) -> None

注册 PDO 连续丢帧回调。当连续丢帧达到阈值时触发,单次丢帧仅计数不触发。每组独立跟踪。始终自动记录日志。

回调参数: (master_index, group, consecutive_lost, total_lost)

  • master_index (int) — 主站索引
  • group (int) — 发生丢帧的组号(0-7),对应从站分组
  • consecutive_lost (int) — 该组连续丢帧数
  • total_lost (int) — 该组累计丢帧数

示例:

def on_loss(mi, group, consecutive, total):
print(f"组 {group} 丢帧: 连续={consecutive}, 累计={total}")

master.on_pdo_frame_loss(on_loss)
丢帧统计

PDO 丢帧的详细统计(按组查询)请参考 主站诊断 - PDO 丢帧

set_dc_sync_lost_callback()

def set_dc_sync_lost_callback(self, callback: Optional[Callable[[int, int, int], None]]) -> None

设置 DC 同步丢失回调。当从站从同步->失同步(超出阈值)时触发一次,持续超出不重复触发。传入 None 清除回调。始终自动记录日志。

回调参数: (master_index, slave_index, diff_ns)

  • master_index (int) — 主站索引
  • slave_index (int) — 失同步的从站索引(1-based)
  • diff_ns (int) — 当前与参考时钟的时间差(纳秒)

示例:

master.diagnostics_info.sync_window_threshold = 500  # 500ns

def on_sync_lost(mi, si, diff_ns):
print(f"从站 {si} DC 同步丢失: 偏差 {diff_ns}ns")

master.set_dc_sync_lost_callback(on_sync_lost)
单个从站 DC 诊断

slave.diagnostics.dc.is_in_syncslave.diagnostics.dc.sync_time_difference 可查询从站当前同步状态。详见 从站诊断 - DC 同步

输入数据变化事件

on_input_changed()

def on_input_changed(self, callback: Callable[[int, List[int]], None]) -> None

注册输入 PDO 数据变化回调。仅当从站输入数据与上一周期不同时回调。

零开销设计
  • 无变化时: 不触发回调,零开销
  • 有变化时: 仅触发变化从站的回调
  • 检测精度: 逐字节比较,任何一个 bit 变化都能检测到

回调参数:

  • master_index (int) — 主站索引
  • changed_slave_indices (List[int]) — 数据变化的从站索引列表

示例:

def on_input_changed(mi, changed_slaves):
for si in changed_slaves:
slave = master[si]
raw = slave.pdo.inputs
if raw:
print(f"从站 {si} 输入变化: {raw[:4].hex()}")

master.on_input_changed(on_input_changed)
使用方式对比
  • slave.events.add_input_changed() — 仅需知道"变了",自行读取数据
  • master.on_input_changed() — 统一监控所有从站变化
  • master.on_pdo_cycle(sync=True) — 每周期都需要处理数据

详见 从站事件

与 on_pdo_cycle 的关系

on_input_changedon_pdo_cycle(sync=True) 之前触发。这意味着在同步回调中可以确信变化事件已经分发完毕。两者可以同时使用,互不影响。

冗余事件

on_redundancy_mode_changed()

def on_redundancy_mode_changed(self, callback: Callable[[int, int, int], None]) -> None

冗余运行模式发生变化时触发。old_mode / new_mode 对应 RingMode 枚举值(0=Inactive, 1=Dual, 2=Degraded)。

回调参数: (master_index, old_mode, new_mode)

示例:

def on_red_change(mi, old, new):
modes = {0: "Inactive", 1: "Dual", 2: "Degraded"}
print(f"冗余模式: {modes.get(old)} -> {modes.get(new)}")

master.on_redundancy_mode_changed(on_red_change)
def add_slave_port_link_changed(self,
callback: Callable[[int, int, int, bool], None]) -> None

从站 ESC 端口物理链路变化时触发(P0-P3 之一断开或恢复)。每秒诊断周期检测 DL Status 寄存器的 link bit,从 1→0 触发"断开",从 0→1 触发"恢复"。始终自动记录日志。

用于精确定位故障线缆段(相邻从站的对向端口同时报断,说明中间线缆有问题)。

回调参数:

  • master_index (int) — 主站索引
  • slave_index (int) — 从站索引(1-based)
  • port (int) — 端口号 0-3,对应 P0 / P1 / P2 / P3
  • is_up (bool) — True=link 恢复, False=link 断开

示例:

from ethercat import MasterEvents

events = MasterEvents(master)

def on_port_link(mi, si, port, is_up):
action = "恢复" if is_up else "断开"
print(f"从站 {si} 端口 P{port} link {action}")

events.add_slave_port_link_changed(on_port_link)
配合断点定位

master.diagnostics.break_point 提供聚合后的故障点视图(断线 + CRC 故障)。slave_port_link_changed 是原始事件源,适合做实时告警。详见 主站诊断 - 冗余状态

日志回调

set_log_callback()

def set_log_callback(self, callback: Callable[[int, str], None]) -> None

设置日志回调。

回调参数:

  • category (int) — 日志类别
  • message (str) — 日志消息

示例:

def on_log(category, message):
labels = {0: "错误", 1: "警告", 2: "消息", 3: "邮箱", 4: "PDO", 5: "调试"}
print(f"[{labels.get(category, str(category))}] {message}")

master.set_log_callback(on_log)

高级事件系统 (MasterEvents)

MasterEvents 类提供多监听器支持和从站级事件自动路由。所有事件回调统一注册,MasterEvents 自动分发到对应的监听器。

from ethercat import MasterEvents

events = MasterEvents(master)

# === PDO 周期回调 ===
events.add_pdo_cyclic_async(lambda mi: None) # 异步回调(推荐)
events.add_pdo_cyclic_sync(lambda mi: None) # 同步回调

# === 主站状态变化 ===
events.add_state_changed(lambda old, new:
print(f"主站: 0x{old:02X} -> 0x{new:02X}"))

# === 从站状态变化 ===
events.add_slave_state_changed(lambda mi, si, old, new:
print(f"从站 {si}: 0x{old:02X} -> 0x{new:02X}"))

# === 热插拔 ===
events.add_slave_offline(lambda si: print(f"从站 {si} 离线"))
events.add_slave_online(lambda si: print(f"从站 {si} 上线"))

# === 异常事件 ===
events.add_pdo_frame_loss(lambda mi, grp, cons, total:
print(f"组 {grp} 丢帧: {cons}"))

events.add_emergency(lambda mi, si, ec, er, b1, w1, w2:
print(f"从站 {si} 紧急: 0x{ec:04X}"))

events.add_dc_sync_lost(lambda mi, si, diff:
print(f"从站 {si} DC 同步丢失: {diff}ns"))

# === 输入数据变化 ===
events.add_input_data_changed(lambda mi, si:
print(f"从站 {si} 输入数据变化"))

# === 冗余 ===
events.add_redundancy_mode_changed(lambda mi, old, new:
print(f"冗余: {old} -> {new}"))

# === 从站端口链路变化 (断线 / 恢复) ===
events.add_slave_port_link_changed(lambda mi, si, port, is_up:
print(f"从站 {si} P{port} {'恢复' if is_up else '断开'}"))

# === 从站身份不符 (热插拔自修复) ===
events.add_slave_identity_mismatch(lambda args:
print(f"从站 {args.slave_index} 身份不符, 实际 Vendor=0x{args.actual_vendor:08X}"))

# === 查询从站离线状态 ===
if events.is_slave_offline(2):
print("从站 2 处于离线状态")

# === 移除监听器 ===
events.remove_slave_offline(my_callback)

clear_all() (MasterEvents)

def clear_all(self) -> None

清除所有事件订阅,防止内存泄漏。一次性移除所有通过 add_* 注册的回调,同时清空离线从站跟踪集合。

示例:

events = MasterEvents(master)
events.add_slave_offline(lambda si: print(f"从站 {si} 离线"))
events.add_emergency(lambda mi, si, ec, er, b1, w1, w2: None)

# 清除所有订阅
events.clear_all()

从站级事件 (SlaveEvents)

从站级事件通过 slave.events 访问(需要先通过 master[n] 获取从站实例)。事件参数已自动过滤到当前从站,不含 master_index / slave_index

slave = master[1]

# 注册从站状态变化
slave.events.add_state_changed(lambda old, new:
print(f"状态: 0x{old:02X} -> 0x{new:02X}"))

# 紧急消息 (error_code, error_reg, b1, w1, w2)
slave.events.add_emergency(lambda ec, er, b1, w1, w2:
print(f"紧急: 0x{ec:04X}"))

# 离线/上线
slave.events.add_offline(lambda: print("从站离线"))
slave.events.add_online(lambda: print("从站上线"))

# DC 同步丢失 (diff_ns)
slave.events.add_dc_sync_lost(lambda diff: print(f"DC 同步丢失: {diff}ns"))

# 输入数据变化 (无参数)
slave.events.add_input_changed(lambda: print("输入数据变化"))

# 移除监听器
slave.events.remove_state_changed(my_callback)
slave.events.remove_emergency(my_callback)
slave.events.remove_offline(my_callback)
slave.events.remove_online(my_callback)
slave.events.remove_dc_sync_lost(my_callback)
slave.events.remove_input_changed(my_callback)

clear_all() (SlaveEvents)

def clear_all(self) -> None

清除该从站的所有事件订阅(包括 FSoE 事件),防止内存泄漏。

示例:

slave = master[1]
slave.events.add_state_changed(lambda old, new: None)
slave.events.add_emergency(lambda ec, er, b1, w1, w2: None)

# 清除该从站的所有订阅
slave.events.clear_all()

线程安全

事件回调在非 UI 线程上触发。如需更新 GUI 或在主线程处理数据,需使用对应的线程同步机制:

import queue

# 使用 queue 跨线程传递事件(推荐)
event_queue = queue.Queue()

def on_discovery(mi, si, found):
event_queue.put(("discovery", si, found))

master.on_slave_discovery(on_discovery)

# 主线程消费
while True:
event_type, idx, found = event_queue.get()
print(f"从站 {idx} {'上线' if found else '离线'}")
GIL 与性能

Python GIL 确保回调是线程安全的,但要注意:

  • 同步 PDO 回调(sync=True)在底层实时线程中执行,必须快速返回
  • 不要在同步回调中进行 I/O 操作、加锁或 SDO 读写
  • 推荐使用异步回调(sync=False)或 queue.Queue 跨线程传递数据

完整示例

from ethercat import EtherCATMaster, EcState

with EtherCATMaster() as master:
# ===== 日志 =====
master.set_log_callback(lambda cat, msg: print(f"[{cat}] {msg}"))

# ===== 状态变化 =====
master.on_state_change(lambda mi, si, old, new:
print(f"从站 {si}: 0x{old:02X} -> 0x{new:02X}"))

# ===== 紧急消息 =====
master.on_emergency(lambda mi, si, ec, er, b1, w1, w2:
print(f"从站 {si} 紧急: 0x{ec:04X}"))

# ===== 从站发现/丢失 =====
master.on_slave_discovery(lambda mi, si, found:
print(f"从站 {si} {'上线' if found else '离线'}"))

# ===== PDO 丢帧 =====
master.on_pdo_frame_loss(lambda mi, grp, cons, total:
print(f"组 {grp} 丢帧: 连续={cons}, 累计={total}"))

# ===== DC 同步丢失 =====
master.set_dc_sync_lost_callback(lambda mi, si, diff:
print(f"从站 {si} DC 同步丢失: {diff}ns"))

# ===== 输入数据变化 =====
master.on_input_changed(lambda mi, changed:
print(f"输入变化: {changed}"))

# ===== 冗余模式变化 =====
master.on_redundancy_mode_changed(lambda mi, old, new:
print(f"冗余: {old} -> {new}"))

# ===== PDO 周期回调(异步) =====
master.on_pdo_cycle(lambda mi: None, sync=False)

# 连接和运行
master.set_network(r"\\Device\\NPF_{GUID}")
master.set_state(EcState.OP)
master.start()