跳到主要内容

FSoE 功能安全

FSoE (Functional Safety over EtherCAT) 安全通信接口,在标准 PDO 通道上叠加安全协议层,实现 SIL3/PLe 等级的功能安全通信。

通过 slave.fsoe 访问。从站不支持 FSoE 时为 None

FSoE 设备检测

slave.fsoe 在从站初始化时自动检测。检测过程:

  1. CoE 前置条件 — 从站必须支持 CoE 邮箱协议
  2. 0xF980:01 检测 — 尝试 SDO 读取设备级 FSoE 安全地址,成功则确认支持
  3. 0x9001:02 检测 — 若上步失败,尝试读取 MDP 连接参数(适用于多模块安全设备)

仅检查 CoE 支持是不够的,因为很多非安全设备也支持 CoE。FSoE 设备必须实现特定的安全对象索引才能被正确识别。

SDK 提供三个层级的 FSoE 支持:

  • slave.fsoe — 基础接口,通过从站访问,提供单连接 FSoE 操作
  • FSoEMdp — MDP 多连接管理器,支持单从站上的多个 FSoE 连接
  • FSoEManager — 多从站多连接协调器,提供 bind_safe_io 一步初始化

FSoE 工作流程

FSoE 在 EtherCAT PDO 通道上建立独立的安全连接。主站负责协议管理,从站负责安全逻辑。

状态机流程:

  • Reset — 初始状态,等待主站发起会话
  • Session — 会话建立,交换连接 ID
  • Connection — 连接建立,验证安全地址
  • Parameter — 参数下载阶段(SRA CRC 校验)
  • Data — 正常安全数据交换
  • Failsafe — 失效安全,从站输出安全值
自动状态推进

FSoEManager.bind_safe_io() 一行完成初始化 + 启动数据交换,中间状态(Session → Connection → Parameter)由 SDK 自动推进。

快速开始

推荐方式

使用 FSoEManager.bind_safe_io() 一步完成初始化,无需手动管理状态机。

FSoEManager / FSoEMdp 为高级 API

本页示例中出现的 FSoEManager(master._dll, master.master_index)FSoEMdp(master._dll, master.master_index, slave_index=...) 写法属于内部高级 API, 仅在需要多 FSoE 连接协调或 MDP 多模块场景时使用。常规单从站安全数据读写走 slave.fsoe (基础接口) 即可。 高级 API 的句柄参数后续可能调整, 请勿在生产代码中长期固化该构造形式。

from ethercat import EtherCATMaster, EcState, FSoEManager

master = EtherCATMaster()
master.set_eni("config.deni")
master.build()
master.state = EcState.OP

# 一步绑定安全 IO
mgr = FSoEManager(master._dll, master.master_index)
mgr.bind_safe_io(
slave_index=1,
safe_input_offset=0,
safe_input_size=2,
safe_output_offset=0,
safe_output_size=1,
safety_address=0x0100,
watchdog_ms=100
)

# PDO 回调中读写安全数据
def on_pdo(mi):
data = mgr.read_input_frame(1, 0)
if data:
mgr.write_output_frame(1, 0, bytes([0x01]))

master.on_pdo_cycle(on_pdo, sync=False)

FSoE 与普通 PDO 的区别

FSoE 安全数据不能像普通 PDO 那样直接用 inputs_mapping() 映射。原因在于协议层的差异:

使用体验对比

尽管底层机制不同,bind_safe_io + 数据交换回调的使用体验已经非常接近普通 PDO。

普通 PDO

IOmap 中的数据就是用户数据,结构体直接叠加到内存指针即可

FSoE PDO

IOmap 中存放的是 FSoE 协议帧,用户的安全数据被包裹在帧内部,且需要经过校验

SDK 的 FSoE 协议引擎负责:

  • CRC 校验 — 验证输入帧完整性、计算输出帧 CRC
  • 状态机管理 — Session -> Connection -> Parameter -> Data 自动推进
  • 看门狗 — 监控通信超时,超时自动进入 Failsafe
  • 序列号 — 检测帧丢失和重复

绕过协议引擎直接读写 IOmap 会破坏安全协议完整性,因此 FSoE 必须通过独立缓冲区访问。

属性

属性类型说明
is_initializedbool是否已初始化
stateFSoEState当前 FSoE 状态
in_failsafebool是否处于失效安全模式
watchdog_expiredbool看门狗是否过期
last_errorFSoEError最后的错误代码
configFSoEConnectionConfig / None当前连接配置
statusFSoEConnectionStatus连接状态详情(通信统计)
is_mdp_modebool是否为 MDP 多连接模式
connection_countint连接数量(单连接模式为 1,多连接模式为模块数)
managerFSoEManager / NoneMDP 多连接管理器(非 MDP 模式为 None)

基础接口 (slave.fsoe)

通过 slave.fsoe 访问。

连接管理

slave = master[1]
fsoe = slave.fsoe

# 初始化 FSoE 连接
fsoe.init_connection(
connection_id=0,
safety_address=0x0100,
watchdog_time_ms=100,
safe_input_size=2,
safe_output_size=1
)

# 获取连接状态
status = fsoe.get_status()
if status:
print(f"FSoE 状态: {status['state']}")
print(f"失效安全: {status['in_failsafe']}")

# 请求状态转换
fsoe.request_state(0x104) # DATA 状态

# 重置连接
fsoe.reset()

# 关闭连接
fsoe.close_connection()

安全数据读写

# 写入安全输出
fsoe.write_safe_output(bytes([0x01]))

# 读取安全输入
data = fsoe.read_safe_input()
if data:
print(f"安全输入: {data.hex()}")

参数下载

# 下载安全参数,返回 SRA CRC
sra_crc = fsoe.download_parameters(param_data)
if sra_crc is not None:
print(f"SRA CRC: 0x{sra_crc:08X}")

# 设置失效安全输出
fsoe.set_failsafe_output(bytes([0x00]))

看门狗与错误

# 检查看门狗状态 (True=正常)
if not fsoe.check_watchdog():
print("FSoE 看门狗超时!")

# 获取最后的错误代码
error = fsoe.get_last_error()
if error != 0:
desc = FSoEHelper.get_error_description(FSoEError(error))
print(f"FSoE 错误: {desc}")

# 清除错误
fsoe.clear_error()

状态控制

request_state()

def request_state(self, target_state: int) -> bool

请求 FSoE 状态转换。自动验证状态转换有效性。

参数:

  • target_state (int) — 目标状态(FSoEState 枚举值)

返回值:

  • bool — 成功返回 True

enter_failsafe()

def enter_failsafe(self) -> bool

主动进入失效安全模式。从站将输出安全值(由 set_failsafe_output 预设)。

reset()

def reset(self) -> bool

重置 FSoE 连接到初始状态,重新开始状态机。

close_connection()

def close_connection(self) -> bool

关闭 FSoE 连接,释放资源。

clear_error()

def clear_error(self) -> None

清除 FSoE 错误。清除后可尝试重新建立连接。

参数和配置

download_parameters()

def download_parameters(self, param_data: bytes) -> Optional[int]

下载安全参数。

备注

仅在 Parameter 状态下可调用。参数内容由安全配置工具生成。

参数:

  • param_data (bytes) — 参数数据

返回值:

  • int | None — 成功返回 SRA CRC32 校验值,失败返回 None

set_failsafe_output()

def set_failsafe_output(self, data: bytes) -> bool

预设失效安全时的输出值。当 FSoE 进入 Failsafe 状态时,从站自动切换到此值。

示例:

# 失效安全时所有输出关闭
fsoe.set_failsafe_output(bytes([0x00]))

# 使用 ctypes 结构体
safe_out = SafeDriveOutput()
safe_out.safety_control_word = 0x0001 # 请求 STO(安全扭矩关闭)
fsoe.set_failsafe_output(bytes(safe_out))

FSoEManager(多从站协调器)

FSoEManager 管理多个从站的 FSoE 连接,提供一步绑定和批量操作。

from ethercat import FSoEManager
mgr = FSoEManager(master._dll, master.master_index)

bind_safe_io()

def bind_safe_io(self, slave_index, safe_input_offset, safe_input_size,
safe_output_offset, safe_output_size,
safety_address=0, watchdog_ms=100) -> bool

一步绑定安全 IO。自动完成连接初始化 + 状态推进到 DATA。

bind_mdp_safe_input() / bind_mdp_safe_output()

def bind_mdp_safe_input(self, slave_index, safety_address, input_size,
module_number=0, watchdog_ms=100) -> bool
def bind_mdp_safe_output(self, slave_index, safety_address, output_size,
module_number=0, watchdog_ms=100) -> bool

MDP 模块绑定安全输入/输出。

bind_mdp_drive_axis()

def bind_mdp_drive_axis(self, slave_index, axis_index,
safety_address=0, watchdog_ms=100) -> bool

绑定 MDP 伺服驱动器的安全轴连接。

批量操作

# 检查所有连接是否在 DATA 状态
if mgr.all_in_data_state():
print("所有安全连接正常")

# 检查是否有连接进入失效安全
if mgr.any_in_failsafe():
print("有连接进入失效安全!")

# 所有连接进入失效安全
mgr.enter_all_failsafe()

# 重置所有连接
mgr.reset_all()

# 关闭所有连接
mgr.close_all()

# 获取状态摘要
print(mgr.get_status_summary())

# 连接数量
print(f"连接数: {mgr.connection_count}")

查找连接

# 按安全地址查找
mdp = mgr.find_by_address(0x0100)

# 按连接 ID 查找
mdp = mgr.find_by_connection_id(0)

安全帧读写

# 写入安全输出帧
mgr.write_output_frame(slave_index=1, connection_id=0, frame_data=bytes([0x01]))

# 读取安全输入帧
data = mgr.read_input_frame(slave_index=1, connection_id=0)

DataExchange 回调

从站级别的安全数据交换回调。每个 PDO 周期中,在安全输入刷新后、安全输出提交前自动触发。

时序:

PDO 周期回调
+-- FSoE 前处理
| +-- 对每个 FSoE 从站:
| +-- 刷新安全输入缓冲区 <- 输入已刷新
| +-- 检查状态变化和错误事件
| +-- DataExchange 回调 <- 在此读写安全数据
+-- ProcessDataCyclicSync <- 普通 PDO 读写
+-- FSoE 后处理
+-- 提交安全输出缓冲区 <- 自动提交输出
PDO 周期时序

每个 PDO 周期,系统自动执行:

  1. 周期开始 — 刷新所有 FSoE 从站的安全输入缓冲区、检查状态变化并触发事件
  2. DataExchange 回调 — 从站级别回调,读取输入(已是最新数据),修改输出
  3. PDO 回调 — 主站级别回调(普通 PDO 读写)
  4. 周期结束 — 提交所有 FSoE 从站的安全输出缓冲区

FSoEConnection(单连接实例)

MDP 模式下的单个 FSoE 连接实例。每个连接拥有独立的状态机、看门狗和安全数据缓冲区。

通过 fsoe.get_connection(index)mgr.find_by_connection_id(id) 访问。

属性类型说明
indexint连接索引(0-based)
is_initializedbool是否已初始化
stateFSoEState当前 FSoE 状态
in_failsafebool是否处于失效安全模式
watchdog_expiredbool看门狗是否过期
last_errorFSoEError最后的错误代码
configFSoEConnectionConfig / None连接配置
module_profileFSoEModuleProfile模块配置文件类型
axis_numberint轴编号(仅驱动连接)

连接级别方法

# 每个连接可独立控制
conn = fsoe.get_connection(0)
conn.request_state(0x104) # 请求状态转换
conn.enter_failsafe() # 进入失效安全
conn.reset() # 重置连接
conn.clear_error() # 清除错误
conn.close() # 关闭连接
conn.download_parameters(data) # 下载安全参数
conn.set_failsafe_output(data) # 设置失效安全输出

FSoEConnectionMode

class FSoEConnectionMode(IntEnum):
SINGLE = 0 # 单连接模式 - 所有模块共用一个 FSoE 连接
MULTIPLE = 1 # 多连接模式 - 每个模块独立 FSoE 连接
选择指南
  • 单连接 (bind_safe_io) — 简单设备、单轴驱动器
  • 多连接 (bind_mdp_safe_io) — 模块需要独立监控/恢复时(如多轴驱动器各轴独立安全连接)

MDP 多连接模式 (FSoEMdp)

FSoEMdp 管理单个从站上的多个独立 FSoE 连接,适用于安全 I/O 模块等支持 MDP 的从站。

from ethercat import FSoEMdp

mdp = FSoEMdp(master._dll, master.master_index, slave_index=1)

连接探测与管理

# 探测可用连接
conn_count = mdp.detect_connections()
print(f"支持 {conn_count} 个 FSoE 连接")

# 获取设备安全地址
addr = mdp.get_device_address()
print(f"设备安全地址: 0x{addr:04X}")

# 获取从站连接数量
total = mdp.get_slave_connection_count()

连接初始化

# 初始化连接(支持 MDP 扩展参数)
mdp.init_connection(
connection_id=0,
safety_address=0x0100,
watchdog_time_ms=100,
safe_input_size=2,
safe_output_size=1,
pdo_input_offset=0, # PDO 输入偏移
pdo_output_offset=0, # PDO 输出偏移
module_number=0, # MDP 模块编号
module_profile=0, # MDP 模块配置文件
axis_number=0, # 驱动器轴号
connection_type=0 # 连接类型
)

# 各连接独立操作
mdp.write_safe_output(connection_id=0, data=bytes([0x01]))
data = mdp.read_safe_input(connection_id=0)
mdp.request_state(connection_id=0, target_state=0x104)
mdp.close_connection(connection_id=0)

模块诊断

# 读取模块通信参数
comm_param = mdp.get_module_comm_param(connection_id=0)

# 读取模块诊断
diag = mdp.get_module_diagnosis(connection_id=0)
if diag:
state, diagnosis = diag
print(f"连接状态: {state}, 诊断码: 0x{diagnosis:04X}")

安全数据读写

read_safe_input() / write_safe_output() 读写原始字节缓冲区。用户按设备厂商提供的安全 PDO 布局解析或拼装字节,常用方式包括:

  • int.from_bytes() / to_bytes() 处理标量字段
  • struct.pack / struct.unpack 处理多字段记录
  • ctypes.Structure 定义类型化布局

示例:

data = fsoe.read_safe_input()
if data:
input_bits = data[0]
ch0_safe = bool(input_bits & 0x01)

fsoe.write_safe_output(bytes([0x01])) # 通道 0 输出

枚举与数据类

FSoEState

class FSoEState(IntEnum):
RESET = 0x100 # 初始/重置状态
SESSION = 0x101 # 会话建立
CONNECTION = 0x102 # 连接建立
PARAMETER = 0x103 # 参数下载
DATA = 0x104 # 数据交换(正常工作)
FAILSAFE = 0x105 # 失效安全

FSoEError

class FSoEError(IntEnum):
NONE = 0x0000 # 无错误
WRONG_COMMAND = 0x0001 # 错误的命令
UNKNOWN_COMMAND = 0x0002 # 未知命令
WRONG_CONNECTION_ID = 0x0003 # 错误的连接 ID
CRC_ERROR = 0x0004 # CRC 校验失败
WATCHDOG = 0x0005 # 看门狗超时
WRONG_ADDRESS = 0x0006 # 错误的 FSoE 地址
WRONG_DATA = 0x0007 # 无效数据
COMM_PARAM_LENGTH = 0x0008 # 通信参数长度错误
COMM_PARAM = 0x0009 # 通信参数错误
APP_PARAM_LENGTH = 0x000A # 应用参数长度错误
APP_PARAM = 0x000B # 应用参数错误
UNEXPECTED_SESSION = 0x000C # 意外的会话命令
FAILSAFE_DATA = 0x000D # 接收到失效安全数据
NOT_INITIALIZED = 0x0100 # 未初始化
MAX_CONNECTIONS = 0x0101 # 超出最大连接数

FSoEFailsafeReason

class FSoEFailsafeReason(IntEnum):
WATCHDOG_TIMEOUT = 0 # 看门狗超时
CRC_ERROR = 1 # CRC 校验错误
COMMUNICATION_ERROR = 2 # 通信错误
APPLICATION_REQUEST = 3 # 应用请求
SLAVE_REQUEST = 4 # 从站请求
MASTER_REQUEST = 5 # 主站请求
RECOVERY_TO_DATA = 6 # 恢复到数据交换

FSoEModuleProfile

class FSoEModuleProfile(IntEnum):
DIGITAL_INPUT = 190 # 数字输入模块
DIGITAL_IN_OUT = 195 # 数字输入输出模块
DIGITAL_OUTPUT = 290 # 数字输出模块
DRIVE_CONNECTION = 790 # 伺服驱动器连接
MASTER = 6900 # 主站模块

FSoEConnectionConfig

@dataclass
class FSoEConnectionConfig:
connection_id: int = 0 # 连接 ID
safety_address: int = 0 # 安全地址
watchdog_time_ms: int = 100 # 看门狗超时 (ms)
safe_input_size: int = 0 # 安全输入大小 (字节)
safe_output_size: int = 0 # 安全输出大小 (字节)
pdo_input_offset: int = 0 # PDO 输入偏移
pdo_output_offset: int = 0 # PDO 输出偏移

FSoEConnectionStatus

@dataclass
class FSoEConnectionStatus:
state: FSoEState = FSoEState.RESET # 当前状态
last_error: FSoEError = FSoEError.NONE # 最后错误
error_count: int = 0 # 错误计数
frames_sent: int = 0 # 发送帧数
frames_received: int = 0 # 接收帧数
crc_errors: int = 0 # CRC 错误数
watchdog_errors: int = 0 # 看门狗错误数
watchdog_expired: bool = False # 看门狗是否过期
in_failsafe: bool = False # 是否在失效安全
is_initialized: bool = False # 是否已初始化

事件参数

@dataclass
class FSoEStateChangedEventArgs:
slave_index: int # 从站索引
connection_index: int # 连接索引
old_state: FSoEState # 变化前状态
new_state: FSoEState # 变化后状态

@dataclass
class FSoEErrorEventArgs:
slave_index: int # 从站索引
connection_index: int # 连接索引
error: FSoEError # 错误代码
current_state: FSoEState # 当前状态
@property
def description(self) -> str # 错误描述文本

@dataclass
class FSoEFailsafeEventArgs:
slave_index: int # 从站索引
connection_index: int # 连接索引
reason: FSoEFailsafeReason # 失效安全原因
entering_failsafe: bool # True=进入, False=恢复

@dataclass
class FSoEDataUpdatedEventArgs:
slave_index: int # 从站索引
connection_index: int # 连接索引
is_input: bool # 是否为输入数据
data: bytes # 安全数据

从站事件

从站级 FSoE 事件通过 slave.events 注册:

slave = master[1]

# FSoE 状态变化
slave.events.add_fsoe_state_changed(lambda old, new:
print(f"FSoE 状态: {old} -> {new}"))

# FSoE 错误
slave.events.add_fsoe_error(lambda code:
print(f"FSoE 错误: 0x{code:04X}"))

# FSoE 进入/恢复失效安全
slave.events.add_fsoe_failsafe_triggered(lambda:
print("进入失效安全模式!"))

# FSoE 安全数据更新
slave.events.add_fsoe_safe_data_updated(lambda data:
print(f"安全数据: {data.hex()}"))

# FSoE 数据交换
slave.events.add_fsoe_data_exchange(lambda inp, out:
print(f"输入: {inp.hex()}, 输出: {out.hex()}"))

工具类

FSoEHelper

from ethercat import FSoEHelper

# 检查状态转换是否合法
valid = FSoEHelper.is_valid_state_transition(FSoEState.RESET, FSoEState.SESSION)

# 获取错误描述
desc = FSoEHelper.get_error_description(FSoEError.WATCHDOG)

# 解析命令字节
cmd = FSoEHelper.get_command_type(0x05)
toggle = FSoEHelper.get_toggle_bit(0x85)

CRC 校验

from ethercat import fsoe_crc16, fsoe_crc16_fast

data = bytes([0x01, 0x02, 0x03])
crc = fsoe_crc16(data) # 标准计算
crc_fast = fsoe_crc16_fast(data) # 查找表加速

is_connection_id_available()

from ethercat.slave.fsoe import is_connection_id_available

def is_connection_id_available(conn_id: int) -> bool

模块级函数, 校验指定 ConnectionID 是否可用 (未被其他活动 FSoE 连接占用)。

参数:

  • conn_id (int) — 待校验的连接 ID, 合法范围 [1, 65535] (0 保留)

返回值:

  • boolTrue 可用 (available); False 已占用 / 非法
ETG.5120 §5.2.3

ConnectionID = 0 为保留值 (不可分配), 合法范围 [1, 65535]。 同一从站网络内每个 FSoE 连接必须使用唯一 ConnID, 重复会导致 WRONG_CONNECTION_ID (FSoEError 0x0003)。

示例 — 初始化前占用检查:

from ethercat.slave.fsoe import is_connection_id_available
from ethercat import FSoEManager

conn_id = 0x1234
if not is_connection_id_available(conn_id):
print(f"ConnID 0x{conn_id:04X} 已被占用, 请选择其他 ID")
else:
mgr = FSoEManager(master._dll, master.master_index)
mgr.bind_safe_io(
slave_index=1,
safe_input_offset=0, safe_input_size=2,
safe_output_offset=0, safe_output_size=1,
safety_address=0x0100, watchdog_ms=100,
# 部分 API 在底层使用 conn_id 参数
)

示例 — 动态分配可用 ID:

def allocate_conn_id(start: int = 1) -> int:
"""从 start 开始扫描首个可用 ConnID"""
for cid in range(start, 0x10000):
if is_connection_id_available(cid):
return cid
raise RuntimeError("无可用 FSoE ConnectionID")

my_id = allocate_conn_id(0x1000)

完整示例

单连接安全 IO

from ethercat import EtherCATMaster, EcState

with EtherCATMaster() as master:
master.set_eni("config.deni")
master.build()
master.state = EcState.OP

slave = master[1]
fsoe = slave.fsoe

if fsoe is not None:
# 初始化连接
fsoe.init_connection(
connection_id=0,
safety_address=0x0100,
watchdog_time_ms=100,
safe_input_size=2,
safe_output_size=1
)

# 设置失效安全输出
fsoe.set_failsafe_output(bytes([0x00]))

# 请求进入数据交换状态
fsoe.request_state(0x104)

# 注册事件
slave.events.add_fsoe_error(lambda code:
print(f"FSoE 错误: 0x{code:04X}"))

# PDO 回调中读写安全数据
def on_pdo(mi):
data = fsoe.read_safe_input()
if data:
ch0 = (data[0] & 0x01) != 0
fsoe.write_safe_output(bytes([0x01 if ch0 else 0x00]))

master.on_pdo_cycle(on_pdo, sync=False)

import time
time.sleep(10)
fsoe.close_connection()

FSoEManager 一步绑定

from ethercat import EtherCATMaster, EcState, FSoEManager

master = EtherCATMaster()
master.set_eni("config.deni")
master.build()
master.state = EcState.OP

mgr = FSoEManager(master._dll, master.master_index)

# 一步绑定(自动 Session→Connection→Parameter→Data)
mgr.bind_safe_io(
slave_index=1,
safe_input_offset=0, safe_input_size=2,
safe_output_offset=0, safe_output_size=1,
safety_address=0x0100, watchdog_ms=100
)

# MDP 伺服驱动器安全轴绑定
mgr.bind_mdp_drive_axis(slave_index=2, axis_index=0,
safety_address=0x0200, watchdog_ms=100)

# 检查所有连接状态
print(mgr.get_status_summary())
if mgr.all_in_data_state():
print("所有安全连接已进入数据交换状态")

# PDO 回调
def on_pdo(mi):
data = mgr.read_input_frame(1, 0)
if data:
mgr.write_output_frame(1, 0, bytes([0x01]))

master.on_pdo_cycle(on_pdo, sync=False)

import time
time.sleep(30)
mgr.close_all()
master.close()

安全驱动器(带类型化读写)

import ctypes
from ethercat import EtherCATMaster, EcState, FSoEManager

class SafeDriveInput(ctypes.Structure):
_pack_ = 1
_fields_ = [
("safety_status_word", ctypes.c_uint16),
("safe_actual_position", ctypes.c_int32),
("safe_actual_velocity", ctypes.c_int32),
]

class SafeDriveOutput(ctypes.Structure):
_pack_ = 1
_fields_ = [
("safety_control_word", ctypes.c_uint16),
]

master = EtherCATMaster()
master.set_eni("config.deni")
master.build()
master.state = EcState.OP

mgr = FSoEManager(master._dll, master.master_index)

# 一步绑定安全驱动器(10 字节输入 + 2 字节输出)
mgr.bind_safe_io(
slave_index=1,
safe_input_offset=0,
safe_input_size=ctypes.sizeof(SafeDriveInput), # 10
safe_output_offset=0,
safe_output_size=ctypes.sizeof(SafeDriveOutput), # 2
safety_address=0x0100,
watchdog_ms=100
)

# PDO 回调中进行安全驱动控制
def on_pdo(mi):
data = mgr.read_input_frame(1, 0)
if data and len(data) >= ctypes.sizeof(SafeDriveInput):
drive_in = SafeDriveInput.from_buffer_copy(data)

drive_out = SafeDriveOutput()
if drive_in.safety_status_word & 0x0100: # Bit8: 安全故障
drive_out.safety_control_word = 0x0080 # 确认故障
print(f"安全故障: 位置={drive_in.safe_actual_position}")
else:
drive_out.safety_control_word = 0x0000 # 清除 STO

mgr.write_output_frame(1, 0, bytes(drive_out))

# 失效安全事件
slave = master[1]
slave.events.add_fsoe_failsafe_triggered(lambda:
print("驱动器安全停止!"))

master.on_pdo_cycle(on_pdo, sync=False)

import time
time.sleep(30)
mgr.close_all()
master.close()

MDP 多连接

from ethercat import FSoEMdp

mdp = FSoEMdp(master._dll, master.master_index, slave_index=1)

# 探测并初始化所有连接
conn_count = mdp.detect_connections()
print(f"支持 {conn_count} 个 FSoE 连接")

# 获取设备安全地址
addr = mdp.get_device_address()
print(f"设备安全地址: 0x{addr:04X}")

for i in range(conn_count):
mdp.init_connection(
connection_id=i,
safety_address=0x0100 + i,
watchdog_time_ms=100,
safe_input_size=2,
safe_output_size=1
)

# 查询各连接状态
for i in range(conn_count):
status = mdp.get_status(i)
if status:
print(f"连接 {i}: 状态={status['state']}, 失效安全={status['in_failsafe']}")

# 模块诊断
for i in range(conn_count):
diag = mdp.get_module_diagnosis(i)
if diag:
state, diagnosis = diag
print(f"连接 {i}: 诊断码=0x{diagnosis:04X}")

# 关闭
for i in range(conn_count):
mdp.close_connection(i)

MDP 多轴安全驱动器

from ethercat import EtherCATMaster, EcState, FSoEManager

master = EtherCATMaster()
master.set_eni("config.deni")
master.build()
master.state = EcState.OP

mgr = FSoEManager(master._dll, master.master_index)

# 绑定 2 个轴(每轴独立 FSoE 连接)
mgr.bind_mdp_drive_axis(slave_index=1, axis_index=0,
safety_address=0x0100, watchdog_ms=100)
mgr.bind_mdp_drive_axis(slave_index=1, axis_index=1,
safety_address=0x0200, watchdog_ms=100)

# 检查所有连接状态
print(mgr.get_status_summary())
if mgr.all_in_data_state():
print("所有安全连接已进入数据交换状态")

# PDO 回调
def on_pdo(mi):
# 各轴独立读写安全数据
for axis in range(2):
data = mgr.read_input_frame(1, axis)
if data:
# 处理安全数据...
pass

master.on_pdo_cycle(on_pdo, sync=False)

# 事件监控: 每轴独立
slave = master[1]
slave.events.add_fsoe_failsafe_triggered(lambda:
print("轴失效安全!"))

import time
time.sleep(30)
mgr.close_all()
master.close()