从站事件
通过 slave.events 访问 SlaveEvents 对象。使用 add_* 方法注册回调,remove_* 方法移除。
事件自动路由
主站级回调会自动路由到对应从站的 slave.events。订阅从站事件无需 master_index/slave_index 参数,系统已自动过滤到当前从站。
功能概览
| 类别 | 方法 | 说明 |
|---|---|---|
| 状态事件 | slave.events.add_state_changed() | 从站 EtherCAT 状态变化 |
| 紧急消息 | slave.events.add_emergency() | CoE Emergency 紧急消息 |
| 热插拔 | slave.events.add_offline() / add_online() | 从站离线/上线 |
| DC 同步 | slave.events.add_dc_sync_lost() | DC 同步丢失 |
| 输入变化 | slave.events.add_input_changed() | 输入 PDO 数据变化 |
| FSoE | slave.events.add_fsoe_state_changed() 等 | FSoE 安全事件 |
从站状态事件
add_state_changed()
def add_state_changed(self, callback: Callable[[int, int], None]) -> None
注册从站状态变化监听。回调参数: (old_state, new_state)。
示例:
slave = master[1]
def on_state_changed(old_state, new_state):
print(f"状态变化: 0x{old_state:02X} -> 0x{new_state:02X}")
slave.events.add_state_changed(on_state_changed)
add_emergency()
def add_emergency(self, callback: Callable[[int, int, int, int, int], None]) -> None
注册紧急事件监听。回调参数: (error_code, error_reg, b1, w1, w2)。
示例:
slave.events.add_emergency(lambda ec, er, b1, w1, w2:
print(f"紧急消息: 错误码=0x{ec:04X}"))
add_offline() / add_online()
def add_offline(self, callback: Callable[[], None]) -> None
def add_online(self, callback: Callable[[], None]) -> None
注册从站离线/上线监听。无参数回调。
示例:
slave.events.add_offline(lambda: print("从站离线"))
slave.events.add_online(lambda: print("从站上线"))
add_dc_sync_lost()
def add_dc_sync_lost(self, callback: Callable[[int], None]) -> None
注册 DC 同步丢失监听。回调参数: (diff_ns)。
示例:
slave.events.add_dc_sync_lost(lambda diff_ns:
print(f"DC 同步丢失: 偏差 {diff_ns}ns"))
输入数据变化
add_input_changed()
def add_input_changed(self, callback: Callable[[], None]) -> None
注册输入 PDO 数据变化监听。仅当输入数据与上一周期不同时回调。
零开销
- 无变化时:不触发回调,零性能开销
- 有变化时:仅变化的从站收到回调,其他从站不受影响
示例:
slave.events.add_input_changed(lambda: print("输入数据变化"))
PDO 数据变化
inputs_mapping 回调
使用 slave.pdo.inputs_mapping(on_changed) 订阅输入 PDO 数据变化。底层检测到输入数据变化时自动触发结构体比较。
回调参数:
@dataclass
class PdoStructChangedEventArgs:
slave_index: int # 从站索引
is_input: bool # 是否为输入 PDO
previous: Any # 变化前的结构体值
current: Any # 当前结构体值
timestamp: float # 变化时间戳
示例:
import ctypes
class ServoInput(ctypes.Structure):
_pack_ = 1
_fields_ = [
("status_word", ctypes.c_uint16),
("actual_position", ctypes.c_int32),
("actual_velocity", ctypes.c_int32),
]
def on_input_changed(e):
print(f"位置变化: {e.previous.actual_position} -> {e.current.actual_position}")
input_ref = slave.pdo.inputs_mapping(ServoInput, on_changed=on_input_changed)
与 add_input_changed 的区别
add_input_changed()— 仅通知"数据变了",需自行读取数据。开销最低。inputs_mapping(on_changed)— 自动提供变化前后的结构体值。开销略高,但更方便。- 两者可以同时使用,底层共享同一个变化检测机制。
FSoE 安全事件
# FSoE 状态变化 (old_state, new_state)
slave.events.add_fsoe_state_changed(lambda old, new:
print(f"FSoE: {old} -> {new}"))
# FSoE 错误 (error_code)
slave.events.add_fsoe_error(lambda code:
print(f"FSoE 错误: 0x{code:04X}"))
# FSoE 进入失效安全
slave.events.add_fsoe_failsafe_triggered(lambda:
print("进入失效安全模式!"))
# FSoE 安全数据更新 (data: bytes)
slave.events.add_fsoe_safe_data_updated(lambda data:
print(f"安全数据: {data.hex()}"))
# FSoE 数据交换 (input_data, output_data)
slave.events.add_fsoe_data_exchange(lambda inp, out:
print(f"输入: {inp.hex()}, 输出: {out.hex()}"))
移除监听器
所有 add_* 方法都有对应的 remove_* 方法:
slave.events.remove_state_changed(my_callback)
slave.events.remove_emergency(my_callback)
slave.events.remove_offline(my_callback)
slave.events.remove_online(my_callback)
slave.events.remove_dc_sync_lost(my_callback)
slave.events.remove_input_changed(my_callback)
线程安全
事件回调在非 UI 线程上触发。更新 UI 时需要线程同步:
import queue
data_queue = queue.Queue()
def on_input_changed():
data_queue.put(("input_changed", slave.index))
slave.events.add_input_changed(on_input_changed)
注意
回调中避免执行耗时操作。如需处理大量数据,将数据放入队列异步处理:
def on_pdo_changed(e):
data_queue.put(e.current) # 快速入队
slave.pdo.inputs_mapping(ServoInput, on_changed=on_pdo_changed)
完整示例
import ctypes
from ethercat import EtherCATMaster, EcState
class ServoInput(ctypes.Structure):
_pack_ = 1
_fields_ = [
("status_word", ctypes.c_uint16),
("actual_position", ctypes.c_int32),
("actual_velocity", ctypes.c_int32),
]
with EtherCATMaster() as master:
master.set_network(r"\\Device\\NPF_{GUID}")
master.set_state(EcState.OP)
master.start()
slave = master[1]
# ===== 从站状态事件 =====
slave.events.add_state_changed(lambda old, new:
print(f"状态: 0x{old:02X} -> 0x{new:02X}"))
slave.events.add_emergency(lambda ec, er, b1, w1, w2:
print(f"紧急消息: 0x{ec:04X}"))
slave.events.add_offline(lambda: print("从站离线"))
slave.events.add_online(lambda: print("从站上线"))
slave.events.add_dc_sync_lost(lambda diff:
print(f"DC 同步丢失: {diff}ns"))
# ===== 输入数据变化 =====
slave.events.add_input_changed(lambda: print("输入数据变化"))
# ===== PDO 数据变化(自动结构体比较) =====
input_ref = slave.pdo.inputs_mapping(ServoInput,
on_changed=lambda e: print(f"位置: {e.current.actual_position}"))
# ===== FSoE 安全事件(协议专属) =====
if slave.fsoe is not None:
slave.events.add_fsoe_state_changed(lambda old, new:
print(f"FSoE: {old} -> {new}"))
slave.events.add_fsoe_error(lambda code:
print(f"FSoE 错误: 0x{code:04X}"))
slave.events.add_fsoe_failsafe_triggered(lambda:
print("进入失效安全模式!"))
slave.events.add_fsoe_safe_data_updated(lambda data:
print(f"安全数据: {data.hex()}"))
slave.events.add_fsoe_data_exchange(lambda inp, out:
print(f"输入: {inp.hex()}, 输出: {out.hex()}"))
import time
time.sleep(30)
master.stop()