属性与状态机
属性
| 属性 | 类型 | 说明 |
|---|---|---|
| master_index | int | 主站实例索引 |
| master_number | int | 主站编号(master_index 的别名) |
| slave_count | int | 在线从站数量 |
| slaves | List[Slave] | 从站列表(1-based 索引映射) |
| is_disposed | bool | 主站是否已被释放 |
| link_status | EcLinkState | 网络链路状态 |
| link_state | EcLinkState | 链路状态 (link_status 别名) |
| ring_mode | RingMode | 冗余运行模式 |
| secondary_link_ok | bool | 冗余链路是否正常 |
| error_code | EcALState | AL Status Code,状态切换失败时的错误原因 |
| groups | _GroupAccessor | 组索引器 |
| active_group_count | int | 活跃组数量 |
| obytes | int | 总输出字节数 |
| ibytes | int | 总输入字节数 |
| obits | int | 总输出位数 |
| ibits | int | 总输入位数 |
| has_dc | bool | 是否有 DC 从站 |
| loop_cycle | int | PDO 交换周期时间(纳秒,读写) |
| dll_version | Optional[dict] | 原生库版本信息 |
| identity | Optional[dict] | 主站身份信息(ETG.1510) |
| diagnostics | Optional[dict] | ETG.1510 诊断数据 |
| diagnostics_info | MasterDiagnosticsInfo | 高级诊断信息对象 |
EcLinkState
from enum import IntEnum
class EcLinkState(IntEnum):
DISCONNECTED = 0 # 无连接
CONNECTED = 1 # 主网口和冗余网口都连接
PRIMARY_ONLY = 3 # 仅主网口连接
SECONDARY_ONLY = 4 # 仅冗余网口连接
状态机管理
state
@property
def state(self) -> Optional[EcState]
获取主站当前 EtherCAT 状态(只读)。
EcState 枚举:
from enum import IntEnum
class EcState(IntEnum):
NONE = 0x00
INIT = 0x01
PRE_OP = 0x02
BOOT = 0x03
SAFE_OP = 0x04
OP = 0x08
ACK = 0x10
ERROR = 0x10
set_state()
def set_state(self, state: EcState, return_error: bool = False)
切换 EtherCAT 主站状态。这是唯一的公开同步状态机 API,内部完成跨级链式转换 (INIT → PRE_OP → SAFE_OP → OP)、每步执行 Before/After 启动参数、SafeOp/OP 进入后启动 PDO 线程,每步默认 5 秒超时。
SDK 内部对每一级状态切换自动执行最多 3 次重试,每次失败后等待约 1.5 秒后再次尝试,用于吸收硬件自适应、链路抖动等瞬态问题。应用层无需自行包装 for _ in range(3): master.set_state(...) 循环,重复重试只会拉长失败路径的总耗时。
切换到 SAFE_OP 时,SDK 按 ETG 标准为非 DC 从站自动配置同步循环计数阈值与 SyncManager 同步类型 (FreeRun 兜底),应用层无需手动写。
DC 从站跳过此自动配置以避免与 DC 时基冲突。如需自定义同步策略,应在切到 SAFE_OP 之后再覆盖。
| 参数 | 类型 | 默认 | 说明 |
|---|---|---|---|
| state | EcState | — | 目标状态 (INIT / PRE_OP / SAFE_OP / OP) |
| return_error | bool | False | True 时返回 (bool, str | None) 元组 |
返回值:
| return_error | 返回类型 | 说明 |
|---|---|---|
| False (默认) | bool | 是否成功 |
| True | tuple[bool, str | None] | 失败时第二项为中文错误描述,成功时为 None |
示例:
from ethercat import EcState
# 基本用法 (只关心成功与否)
if not master.set_state(EcState.OP):
print(f"状态切换失败, 错误码: {master.error_code}")
set_state(state, return_error=True)
ok, err = master.set_state(state, return_error=True)
带错误信息的形式。返回 (ok, err_msg) 元组,便于日志或上报。
示例:
ok, err = master.set_state(EcState.OP, return_error=True)
if not ok:
print(f"状态切换失败: {err}")
set_state_async()
async def set_state_async(self, state: EcState) -> bool
异步版状态切换,不阻塞调用线程。内部通过 asyncio.run_in_executor 委托线程池执行 set_state()。仅返回布尔值;如需错误信息请使用同步版 set_state(state, return_error=True)。
| 参数 | 类型 | 说明 |
|---|---|---|
| state | EcState | 目标状态 |
返回值:
| 类型 | 说明 |
|---|---|
| bool | 是否成功 |
示例:
import asyncio
from ethercat import EcState
async def main():
ok = await master.set_state_async(EcState.OP)
if not ok:
print(f"状态切换失败, 错误码: {master.error_code}")
asyncio.run(main())
validate()
def validate(self) -> ValidationResult
Build 前配置预检查。检查主站配置是否满足最低要求,在调用 build() 之前使用。
返回值:
ValidationResult— 包含is_valid(bool) 和errors(List[str])
示例:
result = master.validate()
if not result.is_valid:
for err in result.errors:
print(f"配置错误: {err}")
else:
master.build()
start() / stop()
def start(self) -> None
def stop(self) -> None
启动/停止 PDO 循环线程和状态监控线程。
示例:
master.set_state(EcState.OP)
master.start()
# ... 运行 ...
master.stop()
过程数据看门狗
set_all_watchdog()
def set_all_watchdog(self, timeout_ms: int) -> int
批量设置所有从站的过程数据看门狗超时。
参数:
timeout_ms(int) — 超时时间(毫秒),0 = 禁用
返回值:
int— 成功设置的从站数量
示例:
count = master.set_all_watchdog(100)
print(f"已设置 {count} 个从站看门狗超时为 100ms")
set_all_pdi_watchdog()
def set_all_pdi_watchdog(self, timeout_ms: int) -> int
批量设置所有从站的 PDI 看门狗超时, 用于检测 ESC 与从站 MCU 之间的接口异常。
参数:
timeout_ms(int) — 超时时间 (毫秒), 0 = 禁用
返回值:
int— 成功设置的从站数量
诊断控制方法
reset_diagnostics()
def reset_diagnostics(self) -> None
清零所有诊断计数器 (帧丢失/抖动/WKC/RX 错误等), 通常在长时间观察前重置。
reset_slave_port_error_counters()
def reset_slave_port_error_counters(self, slave_index: int) -> bool
清零指定从站 4 个端口的 ESC 错误计数器 (0x300/0x302/0x304/0x306, 用于物理层链路诊断)。
参数:
slave_index(int) — 从站编号 (1-based)
返回值:
bool— 成功返回True
master.lock_iomap() / unlock_iomap() 可在多轴批量原子写入时禁用 mutex_protection 后手动加锁; get_expected_wkc() / set_expected_wkc() 用于可选从站场景下调整期望 WKC; record_cycle_time() / update_diagnostics_snapshot() 配合 diagnostics_info 读取实时性能指标。
热插拔自修复
在断电重插 / 更换从站的场景下,SDK 自动识别身份不符并进入保护状态,避免错误设备被误纳入控制循环。事件流程见 slave_identity_mismatch 事件。
acknowledge_slave_replacement()
def acknowledge_slave_replacement(self, slave_num: int) -> bool
用户确认从站替换完毕,触发 EtherCAT 识别状态机重新探测该从站。
调用时机: 订阅 master.events.add_slave_identity_mismatch() 接收到身份不符报警后,操作员检查 / 更换设备完毕,调用本方法让 SDK 重新检测。
行为:
- 若身份已纠正(换回正确设备 / 同型号升级 Revision)→ 自动恢复并触发
slave_online事件 - 若身份仍不匹配 → 再次触发
slave_identity_mismatch,回到IDENT_REJECTED状态
参数:
slave_num(int) — 从站编号(1-based,与配置一致)
返回值:
bool—True=已接受并复位 FSM;False=参数无效,或从站当前不在IDENT_REJECTED/FAILED状态(收到slave_identity_mismatch事件之前调用返回False)
在 slave_identity_mismatch 事件触发之前调用本方法无效。从站未进入保护状态时 SDK 会持续正常探测,无需手动确认。
示例:
from ethercat import MasterEvents
events = MasterEvents(master)
def on_identity_mismatch(args):
print(f"从站 {args.slave_index} 身份不符: "
f"期望 Vendor=0x{args.expected_vendor:08X}, "
f"实际 Vendor=0x{args.actual_vendor:08X}")
# UI 弹窗: "请检查从站 N 的设备, 换回正确型号后点击确认"
if show_replacement_dialog(args.slave_index):
ok = master.acknowledge_slave_replacement(args.slave_index)
if not ok:
print("确认失败: 从站不在 IDENT_REJECTED 状态")
events.add_slave_identity_mismatch(on_identity_mismatch)
slave_offline— 从站断电 / 断线(身份仍匹配),自动恢复并触发slave_onlineslave_identity_mismatch— 从站身份变了(换错设备 / 换同型号旧版本固件),需手动调acknowledge_slave_replacement()
资源释放
close()
def close(self) -> None
关闭主站并释放资源 (对应 C# Close / Dispose)。等价于 dispose()。
内部按以下顺序关闭:
- 标记主站索引失效, 阻止后续回调访问
- 注销全局回调注册表 (防 GC 竞争)
- 释放
events / diagnostics_info / config / master_od / mailbox_gateway子对象 - 调用 DLL
EcClose停止所有 native 线程, 失败时退化到Stop+Dispose - 清空回调与从站缓存
master = EtherCATMaster()
try:
master.set_network(adapter)
master.set_state(EcState.OP)
# ... 运行 ...
finally:
master.close()
# 或使用上下文管理器自动关闭
with EtherCATMaster() as master:
master.set_network(adapter)
master.set_state(EcState.OP)
dispose()
def dispose(self) -> None
close() 的别名, 兼容 C# Dispose 命名习惯。
拓扑刷新
rebuild_topology()
def rebuild_topology(self) -> None
重建从站拓扑访问器 (对应 C# RebuildTopology)。在从站列表发生变化 (热插拔 / 重新扫描) 后调用, 强制 slave_topology 视图重新派生父子关系与端口映射。
# 重新扫描后刷新拓扑视图
master.set_network(adapter)
master.rebuild_topology()
for node in master.slave_topology:
print(f"slave {node.index} parent={node.parent}")
topology / topology_children() / topology_roots()
@property
def topology(self) -> List[dict]
def topology_children(self, parent_index: int, max_results: int = 64) -> List[int]
def topology_roots(self, max_results: int = 64) -> List[int]
构建网络拓扑节点列表; 查询指定父从站的子节点 / 网络根节点 (直连主站) 列表。返回的字典包含 slave_index / config_addr / parent_index / entry_port / active_ports / topology / port_type。
DC 时钟扩展属性
master_dc_time
@property
def master_dc_time(self) -> int
最近一次 FRMW 取回的 64 位 DC 系统时间, 单位纳秒, 纪元 2000-01-01 00:00:00 UTC。DC 未激活或 DLL 未导出时返回 0。
reference_clock_slave_index
@property
def reference_clock_slave_index(self) -> int
参考时钟从站索引 (1-based)。返回 0 表示无 DC 从站或 DLL 未导出。
主站组管理
active_group_count
@property
def active_group_count(self) -> int
活跃组数量 (映射后有效)。对齐 C# ActiveGroupCount。
get_group_expected_wkc()
def get_group_expected_wkc(self, group: int) -> int
读取指定组的期望 WKC (对齐 C# GetGroupExpectedWKC)。
get_expected_wkc() / set_expected_wkc()
def get_expected_wkc(self) -> int
def set_expected_wkc(self, expected_wkc: int) -> None
读取 / 覆写全局期望 WKC, 用于含可选从站的网络场景手动调整。
# 网络含 1 个 optional 从站, 该从站不在线时
master.set_expected_wkc(master.get_expected_wkc() - 3)
IOmap 锁
lock_iomap() / unlock_iomap()
def lock_iomap(self) -> None
def unlock_iomap(self) -> None
手动加 / 解 IOmap 互斥锁, 用于在禁用 mutex_protection 时按批次原子写入多组从站输出。
master.mutex_protection = False
master.lock_iomap()
try:
for slave in master.slaves:
slave.set_output_value("ControlWord", 0x0F)
finally:
master.unlock_iomap()
状态机精细控制 (state_manager)
主站还提供 MasterStateManager 子对象, 暴露 DLL 层的细粒度状态切换。常规场景请直接使用 master.set_state()。
from ethercat.master.state import MasterStateManager
mgr = MasterStateManager(master._dll, master.master_index)
mgr.set_state(EcState.PRE_OP, timeout_ms=3000) # 单步切换
mgr.set_state_sequence(EcState.OP, timeout_ms=10000) # 链式切换 + 启动参数
mgr.set_slave_state(2, EcState.SAFE_OP) # 单从站切换
slave_state = mgr.get_slave_state(2)
al_code = mgr.get_slave_al_status_code(2)
verify_startup_configuration()
def verify_startup_configuration(self) -> dict
启动配置预检查 (对应 C# VerifyStartupConfiguration), 综合检查主站状态、链路状态、各从站 AL 状态、ETG.1500 配置, 返回结果字典。
返回值:
{
'valid': bool, # 整体是否有效
'slave_count': int, # 在线从站数
'issues': list[str], # 问题描述列表 (中文)
}
示例:
result = master.state_manager.verify_startup_configuration() \
if hasattr(master, 'state_manager') else \
MasterStateManager(master._dll, master.master_index) \
.verify_startup_configuration()
if not result['valid']:
for issue in result['issues']:
print(f"配置问题: {issue}")
MasterStateManager.close()
def close(self) -> None
关闭主站并释放资源, 内部依次调用 AbortNetwork → Stop → Dispose。EtherCATMaster.close() 已经调用 native 层 EcClose, 通常不需要单独调用本方法。
ESM 超时常量 (EsmTimeouts)
from ethercat import EsmTimeouts
ETG.1020 状态转换默认超时 (毫秒), 与 C# EsmTimeouts 一致。
| 字段 | 默认值 (ms) | 说明 |
|---|---|---|
| INIT_TO_PREOP | 3000 | Init → PreOp |
| PREOP_TO_SAFEOP | 10000 | PreOp → SafeOp (含 PDO 映射) |
| SAFEOP_TO_OP | 3000 | SafeOp → OP |
| OP_TO_SAFEOP | 200 | OP → SafeOp |
| SAFEOP_TO_PREOP | 200 | SafeOp → PreOp |
| PREOP_TO_INIT | 200 | PreOp → Init |
| TO_BOOT | 3000 | 任意状态 → Boot |
| BOOT_TO_INIT | 3000 | Boot → Init |
| SDO_TIMEOUT | 3000 | SDO 通信 |
| MAILBOX_TIMEOUT | 5000 | 邮箱通信 |
| STATE_STABILIZE_DELAY | 100 | 状态切换后稳定等待 |
EsmTimeouts.get_transition_timeout()
@staticmethod
def get_transition_timeout(current_state: int, target_state: int) -> int
按状态值 (EcState 整型) 查表返回推荐超时 (ms)。未在表中的转换返回兜底 3000ms。
from ethercat import EcState, EsmTimeouts
t = EsmTimeouts.get_transition_timeout(EcState.PRE_OP, EcState.SAFE_OP)
print(f"PreOp → SafeOp 推荐超时: {t}ms") # 10000
AL 状态码分类
ALErrorCategory
from ethercat import ALErrorCategory
class ALErrorCategory(IntEnum):
NONE = 0 # 无错误
TRANSIENT = 1 # 临时错误 - 可通过重试自动恢复
CONFIGURATION = 2 # 配置错误 - 需要重新配置
HARDWARE = 3 # 硬件错误 - 需要用户干预
UNKNOWN = 4 # 未知错误
classify_al_error()
from ethercat import classify_al_error
def classify_al_error(al_status_code: int) -> ALErrorCategory
根据 AL Status Code 判断错误类别, 参考 ETG.1020 标准。临时错误可重试, 配置错误需重新配置, 硬件错误需用户干预。
from ethercat import EcState, ALErrorCategory, classify_al_error
if not master.set_state(EcState.OP):
code = int(master.error_code)
cat = classify_al_error(code)
if cat == ALErrorCategory.TRANSIENT:
time.sleep(0.5)
master.set_state(EcState.OP) # 重试
elif cat == ALErrorCategory.CONFIGURATION:
print(f"配置错误 0x{code:04X}, 请检查 ENI / startup 参数")
elif cat == ALErrorCategory.HARDWARE:
print(f"硬件错误 0x{code:04X}, 请检查从站 / 链路")
ALErrorClassifier
from ethercat.master import ALErrorClassifier
ALErrorClassifier.classify(0x001B) # "启动错误"
ALErrorClassifier.classify(0x0042) # "邮箱错误"
按 AL Status Code 高字节范围返回中文分类描述, 适合 HMI / 日志展示, 不参与重试逻辑。返回值见下表:
| 范围 | 分类 |
|---|---|
| 0x0000 | 无错误 |
| 0x0001 - 0x000F | 通用错误 |
| 0x0010 - 0x001F | 启动错误 |
| 0x0020 - 0x002F | 邮箱错误 |
| 0x0030 - 0x003F | 看门狗错误 |
| 0x0040 - 0x004F | 同步错误 |
| 0x0050 - 0x005F | 分布式时钟错误 |
| 0x0060 - 0x006F | MBX/EoE 错误 |
classify_al_error() 的差异classify_al_error()— 返回ALErrorCategory枚举, 用于编程决策 (是否重试)ALErrorClassifier.classify()— 返回中文字符串, 用于人机展示
应用层一般两者并用: 用枚举决定重试逻辑, 用字符串展示给操作员。
静态工具
EtherCATMaster.emergency_cleanup()
@staticmethod
def emergency_cleanup(dll_path: Optional[str] = None) -> None
紧急释放所有主站资源 (对应 C# EmergencyCleanup)。在进程崩溃 / 异常退出兜底场景使用, 直接关闭所有网卡句柄, 无需主站实例。
import atexit
from ethercat import EtherCATMaster
atexit.register(EtherCATMaster.emergency_cleanup)
EtherCATMaster.get_active_instance_count() / get_max_instance_count()
@staticmethod
def get_active_instance_count(dll_path: Optional[str] = None) -> int
@staticmethod
def get_max_instance_count(dll_path: Optional[str] = None) -> int
查询当前活跃主站实例数 / 最大可创建主站实例数。最大值由可用隔离核心数和驱动上限决定:
- Linux: 由
install.sh的isolcpus参数决定 - Windows: 总核心数 - 1 (驱动运行时隔离)
print(f"活跃实例: {EtherCATMaster.get_active_instance_count()}")
print(f"最大可创建: {EtherCATMaster.get_max_instance_count()}")
EtherCATMaster.is_adapter_available()
@staticmethod
def is_adapter_available(adapter: str, dll_path: Optional[str] = None) -> bool
检查指定网卡是否可用 (NDIS 协议未占用且能 BIND), 用于 UI 选网卡前的预检。
EtherCATMaster.validate_timer_accuracy()
@staticmethod
def validate_timer_accuracy(expected_usec: int, tolerance_usec: int,
dll_path: Optional[str] = None) -> int
验证 OS 高精度定时器实际精度。返回 0 表示在容差内, 非零表示超出容差; Windows 上若返回非零, 多半是高精度定时器服务未启用或 BIOS 关闭了 HPET。
日志开关 (静态方法)
EtherCATMaster.set_pdo_logging(True) # 开 PDO 帧级日志
EtherCATMaster.set_mailbox_logging(True) # 开邮箱日志
EtherCATMaster.set_debug_logging(True) # 开调试日志
或使用模块级函数 (对齐 C# Static.Other):
from ethercat.statics import (
initialize_logging, enable_pdo_logging,
enable_mailbox_logging, enable_debug_logging,
)
from ethercat.utils import DarraCoreDLL
dll = DarraCoreDLL()
initialize_logging(dll)
enable_pdo_logging(dll, True)
enable_mailbox_logging(dll, True)
- PDO 日志 — 每周期一行, 高频时显著拖慢循环, 仅调试期使用
- 邮箱日志 — 每次 SDO/FoE/EoE 交互一行, 一般可常开
- 调试日志 — 内部状态机转换, 默认关