主站诊断
通过 master.diagnostics_info 访问高级诊断功能(帧计数、抖动、丢包率等 MasterDiagnosticsInfo 对象)。
通过 master.diagnostics 访问 ETG.1510 基础诊断数据(返回 dict,包含 cyclic_lost_frames、acyclic_lost_frames、cyclic_fps、acyclic_fps、state)。
建议通过 事件 驱动异常处理,而非自行轮询。 直接读取诊断属性适用于 UI 显示等场景。
单个从站的状态诊断、链路质量请参考 从站诊断。
功能概览
| 功能 | 说明 |
|---|---|
| 通信与性能统计 | 帧计数、丢包、抖动、PDO 丢帧、网口状态、拓扑 |
| DC 同步 | 同步窗口阈值、dc_sync_lost 事件 |
| 冗余状态 | 冗余激活、故障点检测 |
| 诊断控制 | 启停数据采集、重置统计 |
通信与性能统计
| 属性 | 类型 | 需启用 | 说明 |
|---|---|---|---|
| rt_cnt | int | 是 | 每秒帧数(Hz),5秒平均 |
| error_cnt | int | 是 | 每秒错误数,5秒平均 |
| packet_loss_rate | float | 是 | 丢包率(0.0~1.0)— TX vs RX 5 秒滑窗, pipeline 在途不算丢 |
| late_frame_rate | float | 是 | 过慢帧率(0.0~1.0)— idx 出 8 帧窗 stale, 不计入丢包 |
| cycle_time_span | int | 是 | 实际周期时间(微秒) |
| avg_jitter_us | float | 是 | 最近5秒平均抖动(微秒),总线抖动 |
| max_jitter_us | float | 是 | 最近5秒最大抖动(微秒),总线抖动 |
| pdo.total_lost | int | — | 累计丢帧数(所有组合计) |
| pdo.consecutive_lost | int | — | 当前连续丢帧数 |
| pdo.get_frame_loss_stats(group) | PDOFrameLossStats | — | 指定组的丢帧统计 |
| worst_slave_index | int | — | 异常率最高的从站索引 |
| worst_link_quality | int | — | 最差从站的通信健康度(%) |
| primary_wkc | int | — | 主网口工作计数器(冗余模式下独立跟踪) |
| secondary_wkc | int | — | 副网口工作计数器(冗余模式下独立跟踪) |
| primary_port_ok | bool | — | 主端口是否正常 |
| secondary_port_ok | bool | — | 副端口是否正常 |
| primary_port_errors | int | — | 主端口最近5秒错误数 |
| secondary_port_errors | int | — | 副端口最近5秒错误数 |
| topology_description | str | — | 拓扑模式描述 |
| timing_mode | str | — | 定时模式("硬件定时器" / "RT就绪" / "降级" / "RT错误")。WDK 驱动必须就绪 |
PDOFrameLossStats:
@dataclass
class PDOFrameLossStats:
total_lost: int # 累计丢帧数
consecutive_lost: int # 当前连续丢帧数
max_consecutive: int # 历史最大连续丢帧数
诊断快照
get_snapshot()
def get_snapshot(self) -> DiagnosticsSnapshot
获取诊断数据的一致快照。返回当前时刻所有诊断指标的冻结快照(不可变 dataclass),适用于日志记录、UI 刷新、跨线程传递等场景。
DiagnosticsSnapshot 数据类:
@dataclass(frozen=True)
class DiagnosticsSnapshot:
frequency: int # 每秒帧数 (Hz)
error_count: int # 每秒错误数
packet_loss_rate: float # 丢包率 (0.0-1.0) — TX vs RX 5s 滑窗
late_frame_rate: float # 过慢帧率 (0.0-1.0) — idx 出 8 帧窗 stale, 不计丢
avg_jitter_us: float # 平均抖动 (µs)
max_jitter_us: float # 最大抖动 (µs)
cycle_time_us: int # 实际周期时间 (µs)
wkc_actual: int # 当前 WKC
wkc_expected: int # 期望 WKC
primary_port_ok: bool # 主端口正常
secondary_port_ok: bool # 副端口正常
redundancy_active: bool # 冗余激活
示例:
diag = master.diagnostics_info
diag.enabled = True
snap = diag.get_snapshot()
print(f"频率: {snap.frequency} Hz, 丢包率: {snap.packet_loss_rate:.2%}")
print(f"WKC: {snap.wkc_actual}/{snap.wkc_expected}")
print(f"抖动: 平均 {snap.avg_jitter_us:.2f} us, 最大 {snap.max_jitter_us:.2f} us")
print(f"冗余: {'激活' if snap.redundancy_active else '未激活'}")
计算公式:
rt_cnt= 采样周期帧数 / 窗口秒数 — 滑动窗口平均帧频error_cnt= 采样周期错误数 / 窗口秒数 — 滑动窗口平均错误率packet_loss_rate= (TX - RX - pipeline) / TX — 5 秒滑窗, pipeline 在途不算丢late_frame_rate= LateDrop / TX — idx 出 8 帧窗 stale, 不计入丢包
需 enabled = True。
示例:
diag = master.diagnostics_info
diag.enabled = True # 启用诊断数据采集
print(f"帧频: {diag.rt_cnt} Hz")
print(f"丢包率: {diag.packet_loss_rate:.2%}")
print(f"错误数: {diag.error_cnt}")
print(f"周期时间: {diag.cycle_time_span} us")
print(f"抖动: 平均 {diag.avg_jitter_us:.2f} us, 最大 {diag.max_jitter_us:.2f} us")
# PDO 丢帧
pdo = diag.pdo
print(f"PDO 丢帧: 累计={pdo.total_lost}, 连续={pdo.consecutive_lost}")
# 按组查询
stats0 = pdo.get_frame_loss_stats(0)
stats1 = pdo.get_frame_loss_stats(1)
print(f"组0丢帧: {stats0.total_lost}, 组1丢帧: {stats1.total_lost}")
# 从站异常
print(f"最差从站: #{diag.worst_slave_index} ({diag.worst_link_quality}%)")
# 网口状态
print(f"主端口: {'正常' if diag.primary_port_ok else '异常'}")
print(f"副端口: {'正常' if diag.secondary_port_ok else '未连接'}")
# 拓扑信息
print(f"拓扑: {diag.topology_description}")
print(f"定时: {diag.timing_mode}")
每个从站的 ESC 端口错误通过 slave.diagnostics.read_port_errors() 获取。详见 从站诊断 - 通信诊断。
DC 同步
自动监控(ETG.1500 5.13.3),每秒检查各从站时间偏差。超出 sync_window_threshold 阈值时触发 dc_sync_lost 事件。
单个从站的同步状态请使用 slave.diagnostics.dc.is_in_sync 和 slave.diagnostics.dc.sync_time_difference。
sync_window_threshold
@property
def sync_window_threshold(self) -> int
@sync_window_threshold.setter
def sync_window_threshold(self, value: int) -> None
同步窗口阈值(纳秒),默认 1000ns。超出阈值触发 dc_sync_lost 事件。
冗余状态
| 属性 | 类型 | 说明 |
|---|---|---|
| redundancy_active | bool | 冗余是否激活 |
| break_point | Optional[dict] | 当前故障点(count, slave_index, port, type) |
RingMode 枚举:
class RingMode(IntEnum):
INACTIVE = 0 # 未激活
DUAL = 1 # 双向冗余
DEGRADED = 2 # 降级模式
break_point 返回字典,包含以下字段:
{
'count': int, # 故障点数量
'slave_index': int, # 故障从站索引 (1-based)
'port': int, # 故障端口号 (0-3)
'type': int, # 故障类型:0=断线,1=CRC 故障
}
break_point 统一检测两类物理故障:
| 类型 | fault_type | 检测方式 | 典型场景 |
|---|---|---|---|
| 断线 | 0 | DL Status 端口物理链路丢失 | 拔线、线缆断裂 |
| CRC 故障 | 1 | 端口级 RxError + InvalidFrame 持续增长 | 接触不良、线缆老化、连接器氧化 |
故障线缆段定位: 当相邻从站的对向端口(如从站 N 的 P1 和从站 N+1 的 P0)同时报故障,说明连接线缆有问题。仅单侧报故障则定位到该端口连接器。
单个从站的冗余状态请参考 从站诊断 - 冗余诊断。
示例:
diag = master.diagnostics_info
print(f"冗余模式: {master.ring_mode.name}")
if master.ring_mode == RingMode.DEGRADED:
print("警告: secondary链路不可用")
if diag.redundancy_active:
print("冗余已激活")
bp = diag.break_point
if bp is not None:
print(f"故障: 从站{bp['slave_index']} P{bp['port']} "
f"{'断线' if bp['type'] == 0 else 'CRC故障'}")
if bp['type'] == 1:
print("建议检查线缆/连接器")
诊断控制
Reset (诊断计数器)
@property
def enabled(self) -> bool # 诊断数据采集开关 (默认关闭)
def reset(self) -> None # 一次性重置所有诊断统计
enabled 控制诊断数据采集开关 (默认关闭)。启用后周期性采样, 记录标记为"需启用"的统计数据; 其他功能 (PDO 丢帧、从站异常、网口状态、拓扑等) 始终活跃。reset() 一次性清零所有诊断计数器。
AL 错误分类
对 AL Status Code 进行分类,帮助快速判断错误性质和处理策略。
classify_al_error()
def classify_al_error(al_status_code: int) -> ALErrorCategory
对 AL Status Code(从站返回的错误码)进行分类,帮助快速判断错误性质和处理策略。
参数:
al_status_code(int) — AL Status Code,从slave.error_code或状态转换失败时获取
返回值:
ALErrorCategory— 错误分类枚举
ALErrorCategory 枚举:
class ALErrorCategory(IntEnum):
NONE = 0 # 无错误
TRANSIENT = 1 # 瞬态错误,可重试状态转换,通常自动恢复
CONFIGURATION = 2 # 配置错误,检查 PDO 映射、SM 配置、Startup 参数等
HARDWARE = 3 # 硬件错误,检查从站硬件、线缆、电源
UNKNOWN = 4 # 未知错误,查阅 ETG.1000 或从站手册
示例:
from ethercat import classify_al_error, ALErrorCategory
slave = master[1]
if slave.error_code != 0:
category = classify_al_error(slave.error_code)
print(f"从站 {slave.slave_num} 错误 0x{slave.error_code:04X}: {category.name}")
if category == ALErrorCategory.TRANSIENT:
print("瞬态错误,尝试重新切换状态...")
elif category == ALErrorCategory.CONFIGURATION:
print("配置错误,请检查 PDO/SM 配置")
elif category == ALErrorCategory.HARDWARE:
print("硬件错误,请检查从站设备")
0x001E无效输入映射 — CONFIGURATION0x001D无效输出映射 — CONFIGURATION0x0011无效邮箱配置 — CONFIGURATION0x002D同步错误 — TRANSIENT0x0032DC 同步超时 — TRANSIENT0x0050EEPROM 错误 — HARDWARE
从站错误计数器
read_slave_error_counters()
def read_slave_error_counters(self, slave_index: int) -> dict
读取指定从站的错误计数器。
参数:
slave_index(int) — 从站索引(1-based)
返回值:
dict— 错误计数器字典,包含以下字段:
{
'slave_index': int, # 从站编号
'rx_error': List[int], # 各端口 RX 错误计数 [Port0-3]
'invalid_frame': List[int], # 各端口无效帧计数 [Port0-3]
'lost_link': List[int], # 各端口链路丢失计数 [Port0-3]
}
示例:
counters = master.diagnostics_info.read_slave_error_counters(1)
if any(v > 0 for v in counters['rx_error']):
print(f"从站 1 错误计数:")
print(f" RX 错误: {counters['rx_error']}")
print(f" 无效帧: {counters['invalid_frame']}")
print(f" 链路丢失: {counters['lost_link']}")
诊断消息
read_diagnostic_messages()
from ethercat.slave.coe import read_diagnostic_messages
def read_diagnostic_messages(coe: CoE) -> List[DiagnosticMessage]
通过 CoE 读取从站对象 0x10F3(诊断历史对象,ETG.1020)中的诊断消息。返回从站记录的诊断事件列表,包含时间戳、错误码和描述信息。
参数:
coe(CoE) — 从站的 CoE 接口(slave.coe)
返回值:
List[DiagnosticMessage]— 诊断消息列表,无消息时返回空列表
示例:
from ethercat.slave.coe import read_diagnostic_messages
for slave in master.slaves:
if slave.coe is None:
continue
messages = read_diagnostic_messages(slave.coe)
for msg in messages:
print(f"[从站 {slave.slave_num}] 代码=0x{msg.diag_code:08X}, {msg}")
并非所有从站都支持 0x10F3 诊断历史对象。不支持的从站调用时返回空列表。此方法通过 SDO 读取,不建议在实时路径中高频调用。