跳到主要内容

属性与状态机

提示

属性

属性类型说明
master_indexint主站实例索引
master_numberint主站编号(master_index 的别名)
slave_countint在线从站数量
slavesList[Slave]从站列表(1-based 索引映射)
is_disposedbool主站是否已被释放
link_statusEcLinkState网络链路状态
link_stateEcLinkState链路状态 (link_status 别名)
ring_modeRingMode冗余运行模式
secondary_link_okbool冗余链路是否正常
error_codeEcALStateAL Status Code,状态切换失败时的错误原因
groups_GroupAccessor组索引器
active_group_countint活跃组数量
obytesint总输出字节数
ibytesint总输入字节数
obitsint总输出位数
ibitsint总输入位数
has_dcbool是否有 DC 从站
loop_cycleintPDO 交换周期时间(纳秒,读写)
dll_versionOptional[dict]原生库版本信息
identityOptional[dict]主站身份信息(ETG.1510)
diagnosticsOptional[dict]ETG.1510 诊断数据
diagnostics_infoMasterDiagnosticsInfo高级诊断信息对象

EcLinkState

from enum import IntEnum

class EcLinkState(IntEnum):
DISCONNECTED = 0 # 无连接
CONNECTED = 1 # 主网口和冗余网口都连接
PRIMARY_ONLY = 3 # 仅主网口连接
SECONDARY_ONLY = 4 # 仅冗余网口连接

状态机管理

state

@property
def state(self) -> Optional[EcState]

获取主站当前 EtherCAT 状态(只读)。

EcState 枚举:

from enum import IntEnum

class EcState(IntEnum):
NONE = 0x00
INIT = 0x01
PRE_OP = 0x02
BOOT = 0x03
SAFE_OP = 0x04
OP = 0x08
ACK = 0x10
ERROR = 0x10

set_state()

def set_state(self, state: EcState, return_error: bool = False)

切换 EtherCAT 主站状态。这是唯一的公开同步状态机 API,内部完成跨级链式转换 (INIT → PRE_OP → SAFE_OP → OP)、每步执行 Before/After 启动参数、SafeOp/OP 进入后启动 PDO 线程,每步默认 5 秒超时。

内部自动重试

SDK 内部对每一级状态切换自动执行最多 3 次重试,每次失败后等待约 1.5 秒后再次尝试,用于吸收硬件自适应、链路抖动等瞬态问题。应用层无需自行包装 for _ in range(3): master.set_state(...) 循环,重复重试只会拉长失败路径的总耗时。

SafeOp 自动同步配置

切换到 SAFE_OP 时,SDK 按 ETG 标准为非 DC 从站自动配置同步循环计数阈值与 SyncManager 同步类型 (FreeRun 兜底),应用层无需手动写

DC 从站跳过此自动配置以避免与 DC 时基冲突。如需自定义同步策略,应在切到 SAFE_OP 之后再覆盖。

参数类型默认说明
stateEcState目标状态 (INIT / PRE_OP / SAFE_OP / OP)
return_errorboolFalseTrue 时返回 (bool, str | None) 元组

返回值:

return_error返回类型说明
False (默认)bool是否成功
Truetuple[bool, str | None]失败时第二项为中文错误描述,成功时为 None

示例:

from ethercat import EcState

# 基本用法 (只关心成功与否)
if not master.set_state(EcState.OP):
print(f"状态切换失败, 错误码: {master.error_code}")

set_state(state, return_error=True)

ok, err = master.set_state(state, return_error=True)

带错误信息的形式。返回 (ok, err_msg) 元组,便于日志或上报。

示例:

ok, err = master.set_state(EcState.OP, return_error=True)
if not ok:
print(f"状态切换失败: {err}")

set_state_async()

async def set_state_async(self, state: EcState) -> bool

异步版状态切换,不阻塞调用线程。内部通过 asyncio.run_in_executor 委托线程池执行 set_state()。仅返回布尔值;如需错误信息请使用同步版 set_state(state, return_error=True)

参数类型说明
stateEcState目标状态

返回值:

类型说明
bool是否成功

示例:

import asyncio
from ethercat import EcState

async def main():
ok = await master.set_state_async(EcState.OP)
if not ok:
print(f"状态切换失败, 错误码: {master.error_code}")

asyncio.run(main())

validate()

def validate(self) -> ValidationResult

Build 前配置预检查。检查主站配置是否满足最低要求,在调用 build() 之前使用。

返回值:

  • ValidationResult — 包含 is_valid (bool) 和 errors (List[str])

示例:

result = master.validate()
if not result.is_valid:
for err in result.errors:
print(f"配置错误: {err}")
else:
master.build()

start() / stop()

def start(self) -> None
def stop(self) -> None

启动/停止 PDO 循环线程和状态监控线程。

示例:

master.set_state(EcState.OP)
master.start()
# ... 运行 ...
master.stop()

过程数据看门狗

set_all_watchdog()

def set_all_watchdog(self, timeout_ms: int) -> int

批量设置所有从站的过程数据看门狗超时。

参数:

  • timeout_ms (int) — 超时时间(毫秒),0 = 禁用

返回值:

  • int — 成功设置的从站数量

示例:

count = master.set_all_watchdog(100)
print(f"已设置 {count} 个从站看门狗超时为 100ms")

set_all_pdi_watchdog()

def set_all_pdi_watchdog(self, timeout_ms: int) -> int

批量设置所有从站的 PDI 看门狗超时, 用于检测 ESC 与从站 MCU 之间的接口异常。

参数:

  • timeout_ms (int) — 超时时间 (毫秒), 0 = 禁用

返回值:

  • int — 成功设置的从站数量

诊断控制方法

reset_diagnostics()

def reset_diagnostics(self) -> None

清零所有诊断计数器 (帧丢失/抖动/WKC/RX 错误等), 通常在长时间观察前重置。

reset_slave_port_error_counters()

def reset_slave_port_error_counters(self, slave_index: int) -> bool

清零指定从站 4 个端口的 ESC 错误计数器 (0x300/0x302/0x304/0x306, 用于物理层链路诊断)。

参数:

  • slave_index (int) — 从站编号 (1-based)

返回值:

  • bool — 成功返回 True
配套接口

master.lock_iomap() / unlock_iomap() 可在多轴批量原子写入时禁用 mutex_protection 后手动加锁; get_expected_wkc() / set_expected_wkc() 用于可选从站场景下调整期望 WKC; record_cycle_time() / update_diagnostics_snapshot() 配合 diagnostics_info 读取实时性能指标。

热插拔自修复

在断电重插 / 更换从站的场景下,SDK 自动识别身份不符并进入保护状态,避免错误设备被误纳入控制循环。事件流程见 slave_identity_mismatch 事件

acknowledge_slave_replacement()

def acknowledge_slave_replacement(self, slave_num: int) -> bool

用户确认从站替换完毕,触发 EtherCAT 识别状态机重新探测该从站。

调用时机: 订阅 master.events.add_slave_identity_mismatch() 接收到身份不符报警后,操作员检查 / 更换设备完毕,调用本方法让 SDK 重新检测。

行为:

  • 若身份已纠正(换回正确设备 / 同型号升级 Revision)→ 自动恢复并触发 slave_online 事件
  • 若身份仍不匹配 → 再次触发 slave_identity_mismatch,回到 IDENT_REJECTED 状态

参数:

  • slave_num (int) — 从站编号(1-based,与配置一致)

返回值:

  • boolTrue=已接受并复位 FSM;False=参数无效,或从站当前不在 IDENT_REJECTED / FAILED 状态(收到 slave_identity_mismatch 事件之前调用返回 False
必须先收到事件

slave_identity_mismatch 事件触发之前调用本方法无效。从站未进入保护状态时 SDK 会持续正常探测,无需手动确认。

示例:

from ethercat import MasterEvents

events = MasterEvents(master)

def on_identity_mismatch(args):
print(f"从站 {args.slave_index} 身份不符: "
f"期望 Vendor=0x{args.expected_vendor:08X}, "
f"实际 Vendor=0x{args.actual_vendor:08X}")

# UI 弹窗: "请检查从站 N 的设备, 换回正确型号后点击确认"
if show_replacement_dialog(args.slave_index):
ok = master.acknowledge_slave_replacement(args.slave_index)
if not ok:
print("确认失败: 从站不在 IDENT_REJECTED 状态")

events.add_slave_identity_mismatch(on_identity_mismatch)
与 slave_offline / slave_online 的区别
  • slave_offline — 从站断电 / 断线(身份仍匹配),自动恢复并触发 slave_online
  • slave_identity_mismatch — 从站身份变了(换错设备 / 换同型号旧版本固件),需手动调 acknowledge_slave_replacement()

资源释放

close()

def close(self) -> None

关闭主站并释放资源 (对应 C# Close / Dispose)。等价于 dispose()

内部按以下顺序关闭:

  1. 标记主站索引失效, 阻止后续回调访问
  2. 注销全局回调注册表 (防 GC 竞争)
  3. 释放 events / diagnostics_info / config / master_od / mailbox_gateway 子对象
  4. 调用 DLL EcClose 停止所有 native 线程, 失败时退化到 Stop + Dispose
  5. 清空回调与从站缓存
master = EtherCATMaster()
try:
master.set_network(adapter)
master.set_state(EcState.OP)
# ... 运行 ...
finally:
master.close()

# 或使用上下文管理器自动关闭
with EtherCATMaster() as master:
master.set_network(adapter)
master.set_state(EcState.OP)

dispose()

def dispose(self) -> None

close() 的别名, 兼容 C# Dispose 命名习惯。

拓扑刷新

rebuild_topology()

def rebuild_topology(self) -> None

重建从站拓扑访问器 (对应 C# RebuildTopology)。在从站列表发生变化 (热插拔 / 重新扫描) 后调用, 强制 slave_topology 视图重新派生父子关系与端口映射。

# 重新扫描后刷新拓扑视图
master.set_network(adapter)
master.rebuild_topology()
for node in master.slave_topology:
print(f"slave {node.index} parent={node.parent}")

topology / topology_children() / topology_roots()

@property
def topology(self) -> List[dict]

def topology_children(self, parent_index: int, max_results: int = 64) -> List[int]
def topology_roots(self, max_results: int = 64) -> List[int]

构建网络拓扑节点列表; 查询指定父从站的子节点 / 网络根节点 (直连主站) 列表。返回的字典包含 slave_index / config_addr / parent_index / entry_port / active_ports / topology / port_type

DC 时钟扩展属性

master_dc_time

@property
def master_dc_time(self) -> int

最近一次 FRMW 取回的 64 位 DC 系统时间, 单位纳秒, 纪元 2000-01-01 00:00:00 UTC。DC 未激活或 DLL 未导出时返回 0

reference_clock_slave_index

@property
def reference_clock_slave_index(self) -> int

参考时钟从站索引 (1-based)。返回 0 表示无 DC 从站或 DLL 未导出。

主站组管理

active_group_count

@property
def active_group_count(self) -> int

活跃组数量 (映射后有效)。对齐 C# ActiveGroupCount

get_group_expected_wkc()

def get_group_expected_wkc(self, group: int) -> int

读取指定组的期望 WKC (对齐 C# GetGroupExpectedWKC)。

get_expected_wkc() / set_expected_wkc()

def get_expected_wkc(self) -> int
def set_expected_wkc(self, expected_wkc: int) -> None

读取 / 覆写全局期望 WKC, 用于含可选从站的网络场景手动调整。

# 网络含 1 个 optional 从站, 该从站不在线时
master.set_expected_wkc(master.get_expected_wkc() - 3)

IOmap 锁

lock_iomap() / unlock_iomap()

def lock_iomap(self) -> None
def unlock_iomap(self) -> None

手动加 / 解 IOmap 互斥锁, 用于在禁用 mutex_protection 时按批次原子写入多组从站输出。

master.mutex_protection = False
master.lock_iomap()
try:
for slave in master.slaves:
slave.set_output_value("ControlWord", 0x0F)
finally:
master.unlock_iomap()

状态机精细控制 (state_manager)

主站还提供 MasterStateManager 子对象, 暴露 DLL 层的细粒度状态切换。常规场景请直接使用 master.set_state()

from ethercat.master.state import MasterStateManager

mgr = MasterStateManager(master._dll, master.master_index)
mgr.set_state(EcState.PRE_OP, timeout_ms=3000) # 单步切换
mgr.set_state_sequence(EcState.OP, timeout_ms=10000) # 链式切换 + 启动参数
mgr.set_slave_state(2, EcState.SAFE_OP) # 单从站切换
slave_state = mgr.get_slave_state(2)
al_code = mgr.get_slave_al_status_code(2)

verify_startup_configuration()

def verify_startup_configuration(self) -> dict

启动配置预检查 (对应 C# VerifyStartupConfiguration), 综合检查主站状态、链路状态、各从站 AL 状态、ETG.1500 配置, 返回结果字典。

返回值:

{
'valid': bool, # 整体是否有效
'slave_count': int, # 在线从站数
'issues': list[str], # 问题描述列表 (中文)
}

示例:

result = master.state_manager.verify_startup_configuration() \
if hasattr(master, 'state_manager') else \
MasterStateManager(master._dll, master.master_index) \
.verify_startup_configuration()

if not result['valid']:
for issue in result['issues']:
print(f"配置问题: {issue}")

MasterStateManager.close()

def close(self) -> None

关闭主站并释放资源, 内部依次调用 AbortNetwork → Stop → DisposeEtherCATMaster.close() 已经调用 native 层 EcClose, 通常不需要单独调用本方法。

ESM 超时常量 (EsmTimeouts)

from ethercat import EsmTimeouts

ETG.1020 状态转换默认超时 (毫秒), 与 C# EsmTimeouts 一致。

字段默认值 (ms)说明
INIT_TO_PREOP3000Init → PreOp
PREOP_TO_SAFEOP10000PreOp → SafeOp (含 PDO 映射)
SAFEOP_TO_OP3000SafeOp → OP
OP_TO_SAFEOP200OP → SafeOp
SAFEOP_TO_PREOP200SafeOp → PreOp
PREOP_TO_INIT200PreOp → Init
TO_BOOT3000任意状态 → Boot
BOOT_TO_INIT3000Boot → Init
SDO_TIMEOUT3000SDO 通信
MAILBOX_TIMEOUT5000邮箱通信
STATE_STABILIZE_DELAY100状态切换后稳定等待

EsmTimeouts.get_transition_timeout()

@staticmethod
def get_transition_timeout(current_state: int, target_state: int) -> int

按状态值 (EcState 整型) 查表返回推荐超时 (ms)。未在表中的转换返回兜底 3000ms

from ethercat import EcState, EsmTimeouts

t = EsmTimeouts.get_transition_timeout(EcState.PRE_OP, EcState.SAFE_OP)
print(f"PreOp → SafeOp 推荐超时: {t}ms") # 10000

AL 状态码分类

ALErrorCategory

from ethercat import ALErrorCategory

class ALErrorCategory(IntEnum):
NONE = 0 # 无错误
TRANSIENT = 1 # 临时错误 - 可通过重试自动恢复
CONFIGURATION = 2 # 配置错误 - 需要重新配置
HARDWARE = 3 # 硬件错误 - 需要用户干预
UNKNOWN = 4 # 未知错误

classify_al_error()

from ethercat import classify_al_error

def classify_al_error(al_status_code: int) -> ALErrorCategory

根据 AL Status Code 判断错误类别, 参考 ETG.1020 标准。临时错误可重试, 配置错误需重新配置, 硬件错误需用户干预。

from ethercat import EcState, ALErrorCategory, classify_al_error

if not master.set_state(EcState.OP):
code = int(master.error_code)
cat = classify_al_error(code)
if cat == ALErrorCategory.TRANSIENT:
time.sleep(0.5)
master.set_state(EcState.OP) # 重试
elif cat == ALErrorCategory.CONFIGURATION:
print(f"配置错误 0x{code:04X}, 请检查 ENI / startup 参数")
elif cat == ALErrorCategory.HARDWARE:
print(f"硬件错误 0x{code:04X}, 请检查从站 / 链路")

ALErrorClassifier

from ethercat.master import ALErrorClassifier

ALErrorClassifier.classify(0x001B) # "启动错误"
ALErrorClassifier.classify(0x0042) # "邮箱错误"

按 AL Status Code 高字节范围返回中文分类描述, 适合 HMI / 日志展示, 不参与重试逻辑。返回值见下表:

范围分类
0x0000无错误
0x0001 - 0x000F通用错误
0x0010 - 0x001F启动错误
0x0020 - 0x002F邮箱错误
0x0030 - 0x003F看门狗错误
0x0040 - 0x004F同步错误
0x0050 - 0x005F分布式时钟错误
0x0060 - 0x006FMBX/EoE 错误
classify_al_error() 的差异
  • classify_al_error() — 返回 ALErrorCategory 枚举, 用于编程决策 (是否重试)
  • ALErrorClassifier.classify() — 返回中文字符串, 用于人机展示

应用层一般两者并用: 用枚举决定重试逻辑, 用字符串展示给操作员。

静态工具

EtherCATMaster.emergency_cleanup()

@staticmethod
def emergency_cleanup(dll_path: Optional[str] = None) -> None

紧急释放所有主站资源 (对应 C# EmergencyCleanup)。在进程崩溃 / 异常退出兜底场景使用, 直接关闭所有网卡句柄, 无需主站实例。

import atexit
from ethercat import EtherCATMaster

atexit.register(EtherCATMaster.emergency_cleanup)

EtherCATMaster.get_active_instance_count() / get_max_instance_count()

@staticmethod
def get_active_instance_count(dll_path: Optional[str] = None) -> int

@staticmethod
def get_max_instance_count(dll_path: Optional[str] = None) -> int

查询当前活跃主站实例数 / 最大可创建主站实例数。最大值由可用隔离核心数和驱动上限决定:

  • Linux: 由 install.shisolcpus 参数决定
  • Windows: 总核心数 - 1 (驱动运行时隔离)
print(f"活跃实例: {EtherCATMaster.get_active_instance_count()}")
print(f"最大可创建: {EtherCATMaster.get_max_instance_count()}")

EtherCATMaster.is_adapter_available()

@staticmethod
def is_adapter_available(adapter: str, dll_path: Optional[str] = None) -> bool

检查指定网卡是否可用 (NDIS 协议未占用且能 BIND), 用于 UI 选网卡前的预检。

EtherCATMaster.validate_timer_accuracy()

@staticmethod
def validate_timer_accuracy(expected_usec: int, tolerance_usec: int,
dll_path: Optional[str] = None) -> int

验证 OS 高精度定时器实际精度。返回 0 表示在容差内, 非零表示超出容差; Windows 上若返回非零, 多半是高精度定时器服务未启用或 BIOS 关闭了 HPET。

日志开关 (静态方法)

EtherCATMaster.set_pdo_logging(True)       # 开 PDO 帧级日志
EtherCATMaster.set_mailbox_logging(True) # 开邮箱日志
EtherCATMaster.set_debug_logging(True) # 开调试日志

或使用模块级函数 (对齐 C# Static.Other):

from ethercat.statics import (
initialize_logging, enable_pdo_logging,
enable_mailbox_logging, enable_debug_logging,
)
from ethercat.utils import DarraCoreDLL

dll = DarraCoreDLL()
initialize_logging(dll)
enable_pdo_logging(dll, True)
enable_mailbox_logging(dll, True)
性能影响
  • PDO 日志 — 每周期一行, 高频时显著拖慢循环, 仅调试期使用
  • 邮箱日志 — 每次 SDO/FoE/EoE 交互一行, 一般可常开
  • 调试日志 — 内部状态机转换, 默认关