跳到主要内容

从站事件

通过 slave.events 访问 SlaveEvents 对象。使用 add_* 方法注册回调,remove_* 方法移除。

事件自动路由

主站级回调会自动路由到对应从站的 slave.events。订阅从站事件无需 master_index/slave_index 参数,系统已自动过滤到当前从站。

功能概览

类别方法说明
状态事件slave.events.add_state_changed()从站 EtherCAT 状态变化
紧急消息slave.events.add_emergency()CoE Emergency 紧急消息
热插拔slave.events.add_offline() / add_online()从站离线/上线
DC 同步slave.events.add_dc_sync_lost()DC 同步丢失
输入变化slave.events.add_input_changed()输入 PDO 数据变化
FSoEslave.events.add_fsoe_state_changed() 等FSoE 安全事件

从站状态事件

add_state_changed()

def add_state_changed(self, callback: Callable[[int, int], None]) -> None

注册从站状态变化监听。回调参数: (old_state, new_state)

示例:

slave = master[1]

def on_state_changed(old_state, new_state):
print(f"状态变化: 0x{old_state:02X} -> 0x{new_state:02X}")

slave.events.add_state_changed(on_state_changed)

add_emergency()

def add_emergency(self, callback: Callable[[int, int, int, int, int], None]) -> None

注册紧急事件监听。回调参数: (error_code, error_reg, b1, w1, w2)

示例:

slave.events.add_emergency(lambda ec, er, b1, w1, w2:
print(f"紧急消息: 错误码=0x{ec:04X}"))

add_offline() / add_online()

def add_offline(self, callback: Callable[[], None]) -> None
def add_online(self, callback: Callable[[], None]) -> None

注册从站离线/上线监听。无参数回调。

示例:

slave.events.add_offline(lambda: print("从站离线"))
slave.events.add_online(lambda: print("从站上线"))

add_dc_sync_lost()

def add_dc_sync_lost(self, callback: Callable[[int], None]) -> None

注册 DC 同步丢失监听。回调参数: (diff_ns)

示例:

slave.events.add_dc_sync_lost(lambda diff_ns:
print(f"DC 同步丢失: 偏差 {diff_ns}ns"))

输入数据变化

add_input_changed()

def add_input_changed(self, callback: Callable[[], None]) -> None

注册输入 PDO 数据变化监听。仅当输入数据与上一周期不同时回调。

零开销
  • 无变化时:不触发回调,零性能开销
  • 有变化时:仅变化的从站收到回调,其他从站不受影响

示例:

slave.events.add_input_changed(lambda: print("输入数据变化"))

PDO 数据变化

inputs_mapping 回调

使用 slave.pdo.inputs_mapping(on_changed) 订阅输入 PDO 数据变化。底层检测到输入数据变化时自动触发结构体比较。

回调参数:

@dataclass
class PdoStructChangedEventArgs:
slave_index: int # 从站索引
is_input: bool # 是否为输入 PDO
previous: Any # 变化前的结构体值
current: Any # 当前结构体值
timestamp: float # 变化时间戳

示例:

import ctypes

class ServoInput(ctypes.Structure):
_pack_ = 1
_fields_ = [
("status_word", ctypes.c_uint16),
("actual_position", ctypes.c_int32),
("actual_velocity", ctypes.c_int32),
]

def on_input_changed(e):
print(f"位置变化: {e.previous.actual_position} -> {e.current.actual_position}")

input_ref = slave.pdo.inputs_mapping(ServoInput, on_changed=on_input_changed)
与 add_input_changed 的区别
  • add_input_changed() — 仅通知"数据变了",需自行读取数据。开销最低。
  • inputs_mapping(on_changed) — 自动提供变化前后的结构体值。开销略高,但更方便。
  • 两者可以同时使用,底层共享同一个变化检测机制。

FSoE 安全事件

# FSoE 状态变化 (old_state, new_state)
slave.events.add_fsoe_state_changed(lambda old, new:
print(f"FSoE: {old} -> {new}"))

# FSoE 错误 (error_code)
slave.events.add_fsoe_error(lambda code:
print(f"FSoE 错误: 0x{code:04X}"))

# FSoE 进入失效安全
slave.events.add_fsoe_failsafe_triggered(lambda:
print("进入失效安全模式!"))

# FSoE 安全数据更新 (data: bytes)
slave.events.add_fsoe_safe_data_updated(lambda data:
print(f"安全数据: {data.hex()}"))

# FSoE 数据交换 (input_data, output_data)
slave.events.add_fsoe_data_exchange(lambda inp, out:
print(f"输入: {inp.hex()}, 输出: {out.hex()}"))

移除监听器

所有 add_* 方法都有对应的 remove_* 方法:

slave.events.remove_state_changed(my_callback)
slave.events.remove_emergency(my_callback)
slave.events.remove_offline(my_callback)
slave.events.remove_online(my_callback)
slave.events.remove_dc_sync_lost(my_callback)
slave.events.remove_input_changed(my_callback)

线程安全

事件回调在非 UI 线程上触发。更新 UI 时需要线程同步:

import queue
data_queue = queue.Queue()

def on_input_changed():
data_queue.put(("input_changed", slave.index))

slave.events.add_input_changed(on_input_changed)
注意

回调中避免执行耗时操作。如需处理大量数据,将数据放入队列异步处理:

def on_pdo_changed(e):
data_queue.put(e.current) # 快速入队

slave.pdo.inputs_mapping(ServoInput, on_changed=on_pdo_changed)

完整示例

import ctypes
from ethercat import EtherCATMaster, EcState

class ServoInput(ctypes.Structure):
_pack_ = 1
_fields_ = [
("status_word", ctypes.c_uint16),
("actual_position", ctypes.c_int32),
("actual_velocity", ctypes.c_int32),
]

with EtherCATMaster() as master:
master.set_network(r"\\Device\\NPF_{GUID}")
master.set_state(EcState.OP)
master.start()

slave = master[1]

# ===== 从站状态事件 =====
slave.events.add_state_changed(lambda old, new:
print(f"状态: 0x{old:02X} -> 0x{new:02X}"))

slave.events.add_emergency(lambda ec, er, b1, w1, w2:
print(f"紧急消息: 0x{ec:04X}"))

slave.events.add_offline(lambda: print("从站离线"))
slave.events.add_online(lambda: print("从站上线"))

slave.events.add_dc_sync_lost(lambda diff:
print(f"DC 同步丢失: {diff}ns"))

# ===== 输入数据变化 =====
slave.events.add_input_changed(lambda: print("输入数据变化"))

# ===== PDO 数据变化(自动结构体比较) =====
input_ref = slave.pdo.inputs_mapping(ServoInput,
on_changed=lambda e: print(f"位置: {e.current.actual_position}"))

# ===== FSoE 安全事件(协议专属) =====
if slave.fsoe is not None:
slave.events.add_fsoe_state_changed(lambda old, new:
print(f"FSoE: {old} -> {new}"))

slave.events.add_fsoe_error(lambda code:
print(f"FSoE 错误: 0x{code:04X}"))

slave.events.add_fsoe_failsafe_triggered(lambda:
print("进入失效安全模式!"))

slave.events.add_fsoe_safe_data_updated(lambda data:
print(f"安全数据: {data.hex()}"))

slave.events.add_fsoe_data_exchange(lambda inp, out:
print(f"输入: {inp.hex()}, 输出: {out.hex()}"))

import time
time.sleep(30)
master.stop()