跳到主要内容

AoE (ADS over EtherCAT)

AoE 协议实现了 ADS (Automation Device Specification) over EtherCAT 通信,支持 Beckhoff TwinCAT 和类似设备的 ADS 通信。

通过 slave.aoe 访问。从站不支持 AoE 时为 None

设备信息

read_device_info()

def read_device_info(self) -> Optional[dict]

读取 ADS 设备信息。

返回值:

  • dict | None — 设备信息字典,失败返回 None
{
'major': int, # 主版本号
'minor': int, # 次版本号
'build': int, # 构建号
'name': str, # 设备名称
}

示例:

info = slave.aoe.read_device_info()
if info is not None:
print(f"设备: {info['name']} v{info['major']}.{info['minor']}.{info['build']}")

read_state()

def read_state(self) -> Optional[Tuple[int, int]]

读取 ADS 状态,返回 (ads_state, device_state) 元组,失败返回 None

相关结构:

class AdsState(IntEnum):
Invalid = 0 # 无效状态
Idle = 1 # 空闲
Reset = 2 # 复位
Init = 3 # 初始化
Start = 4 # 启动
Run = 5 # 运行
Stop = 6 # 停止
SaveConfig = 7 # 保存配置
LoadConfig = 8 # 加载配置
PowerFailure = 9 # 电源故障
PowerGood = 10 # 电源正常
Error = 11 # 错误
Shutdown = 12 # 关闭
Suspend = 13 # 挂起
Resume = 14 # 恢复
Config = 15 # 配置
Reconfig = 16 # 重新配置

示例:

result = slave.aoe.read_state()
if result is not None:
ads_state, device_state = result
print(f"ADS 状态: {ads_state}, 设备状态: {device_state}")

write_control()

def write_control(self, ads_state: int, device_state: int,
data: bytes = b'') -> bool

写入控制命令(切换设备状态)。

get_ads_state_name()

def get_ads_state_name(self, ads_state: int) -> str

获取 ADS 状态名称。

示例:

result = slave.aoe.read_state()
if result:
ads_state, device_state = result
name = slave.aoe.get_ads_state_name(ads_state)
print(f"ADS 状态: {name} ({ads_state})")

数据读写

read()

def read(self, index_group: int, index_offset: int,
length: int) -> Optional[bytes]

读取 ADS 数据。

参数:

  • index_group (int) — 索引组
  • index_offset (int) — 索引偏移
  • length (int) — 读取长度

返回值:

  • bytes | None — 读取的数据,失败返回 None

示例:

data = slave.aoe.read(0x4020, 0, 4)
if data is not None:
import struct
value = struct.unpack('<I', data)[0]
print(f"状态值: 0x{value:08X}")

write()

def write(self, index_group: int, index_offset: int,
data: bytes) -> bool

写入 ADS 数据。

参数:

  • index_group (int) — 索引组
  • index_offset (int) — 索引偏移
  • data (bytes) — 写入数据

返回值:

  • bool — 成功返回 True

示例:

import struct
slave.aoe.write(0x4020, 0, struct.pack('<I', 0x0006))

read_write()

def read_write(self, index_group: int, index_offset: int,
read_length: int, write_data: bytes = b'') -> Optional[bytes]

ADS 读写操作(同时读写数据)。

示例:

# 读取数据
data = slave.aoe.read_write(0x4020, 0, 4)
if data is not None:
import struct
value = struct.unpack('<I', data)[0]
print(f"值: 0x{value:08X}")

# 写入数据(read_length=0 表示仅写入)
slave.aoe.read_write(0x4020, 0, 0, struct.pack('<I', 0x0006))

send_command()

def send_command(self, target_port: int, command_id: int,
command_data: bytes = b'') -> Optional[bytes]

发送 AoE 命令并接收响应。

参数:

  • target_port (int) — 目标 AMS 端口
  • command_id (int) — ADS 命令 ID
  • command_data (bytes) — 命令数据

返回值:

  • bytes | None — 响应数据,失败返回 None

AoEResultCode 枚举

ADS / AoE 操作的标准结果错误码 (ETG.1020 Table 16). 数值与 C/C++/C#/Java/Rust SDK 完全一致, 高字节通常表示错误类 (0x0700 ADS device, 0x0740 ADS client).

from ethercat.slave.aoe import AoEResultCode
名称说明
NO_ERROR0x0000无错误
INTERNAL_ERROR0x0001内部错误
NO_REALTIME0x0002无实时上下文
ALLOCATION_LOCKED_MEMORY_ERROR0x0003锁定内存分配失败
INSERT_MAILBOX_ERROR0x0004邮箱已满
WRONG_RECEIVE_HMSG0x0005命令 ID 不匹配
TARGET_PORT_NOT_FOUND0x0006目标 AMS 端口未找到
TARGET_MACHINE_NOT_FOUND0x0007目标设备未找到
UNKNOWN_COMMAND_ID0x0008未知命令 ID
BAD_TASK_ID0x0009无效 Task ID
NO_IO0x000A无 IO 通道
UNKNOWN_AMS_COMMAND0x000B未知 AMS 命令
WIN32_ERROR0x000CWin32 错误
PORT_NOT_CONNECTED0x000D端口未连接
INVALID_AMS_LENGTH0x000E无效 AMS 长度
INVALID_AMS_NET_ID0x000F无效 AMS Net ID
AMS_SYNC_TIMEOUT0x0015AMS 同步超时
HOST_UNREACHABLE0x001B主机不可达
WSA_SERVICE_NOT_STARTED0x274D设备服务未启动
DEVICE_INVALID_GROUP0x0700设备无效 IndexGroup
DEVICE_INVALID_OFFSET0x0701设备无效 IndexOffset
DEVICE_INVALID_ACCESS0x0702设备访问无效
DEVICE_INVALID_SIZE0x0703设备数据大小无效
DEVICE_NOT_READY0x0705设备未就绪
DEVICE_BUSY0x0706设备忙
DEVICE_NOT_FOUND0x0708设备不存在
DEVICE_INVALID_STATE0x070C设备状态无效
DEVICE_TIMEOUT0x0712设备超时
DEVICE_ACCESS_DENIED0x071C设备访问被拒绝
CLIENT_INVALID_PARAM0x0741客户端参数无效
CLIENT_INVOKE_TIMEOUT0x0744客户端调用超时
CLIENT_PORT_NOT_OPEN0x0745客户端端口未打开

完整 70+ 项错误码请参见枚举源码; 上表仅列常用条目。

示例:

from ethercat.slave.aoe import AoEResultCode

result = slave.aoe.read_state()
if result is None:
# 通过 read_device_info / read / write 等返回 None 时
# 后续读取 last_result_code 判断 (若 SDK 暴露) 或直接对照 AoEResultCode
print(f"AoE 失败, 错误码可能为 {AoEResultCode.DEVICE_NOT_READY.name}")

AoE 通知

add_notification()

def add_notification(self, index_group: int, index_offset: int,
length: int, trans_mode: int = 0,
max_delay: int = 0, cycle_time: int = 0) -> Optional[int]

添加 ADS 通知,返回通知句柄。

del_notification()

def del_notification(self, handle: int) -> bool

删除 ADS 通知。

高级订阅管理

aoe.py 模块中提供 AoESubscriptionManager 类,封装了更便捷的订阅管理功能:

from ethercat import AoESubscriptionManager

mgr = AoESubscriptionManager(slave.aoe)
handle = mgr.subscribe(0x4020, 0, 4, callback=on_data_changed)
mgr.unsubscribe(handle)
mgr.unsubscribe_all()

路由配置

set_config()

def set_config(self, target_net_id: bytes, target_port: int,
source_net_id: bytes, source_port: int) -> bool

设置 AoE 路由配置。NetID 为 6 字节。

get_config()

def get_config(self) -> Optional[dict]

获取 AoE 路由配置,返回字典:

{
'target_net_id': bytes, # 目标 NetID (6 字节)
'target_port': int, # 目标端口
'source_net_id': bytes, # 源 NetID (6 字节)
'source_port': int, # 源端口
}

跨协议网关

通过 AoE 路由访问其他邮箱协议(ETG.1020),支持 CoE 和 SoE 协议的透明转发。

read_coe_via_aoe()

def read_coe_via_aoe(self, index: int, subindex: int,
read_length: int) -> Optional[bytes]

通过 AoE 路由读取 CoE 对象(IndexGroup=0xF302,IndexOffset=(index << 16) | subindex)。

参数:

  • index (int) — CoE 对象索引
  • subindex (int) — CoE 子索引
  • read_length (int) — 期望读取长度

返回值:

  • bytes | None — 读取的数据,失败返回 None

write_coe_via_aoe()

def write_coe_via_aoe(self, index: int, subindex: int,
data: bytes) -> bool

通过 AoE 路由写入 CoE 对象(IndexGroup=0xF302)。

read_soe_via_aoe()

def read_soe_via_aoe(self, idn: int, read_length: int) -> Optional[bytes]

通过 AoE 路由读取 SoE IDN(IndexGroup=0xF420,IndexOffset=IDN 编号)。

write_soe_via_aoe()

def write_soe_via_aoe(self, idn: int, data: bytes) -> bool

通过 AoE 路由写入 SoE IDN(IndexGroup=0xF420)。

示例:

# 通过 AoE 读取 CoE 对象 0x6041:0(状态字)
data = slave.aoe.read_coe_via_aoe(0x6041, 0, 2)
if data is not None:
import struct
status_word = struct.unpack('<H', data)[0]
print(f"状态字: 0x{status_word:04X}")

# 通过 AoE 写入 CoE 对象 0x6040:0(控制字)
slave.aoe.write_coe_via_aoe(0x6040, 0, struct.pack('<H', 0x000F))

# 通过 AoE 读取 SoE IDN
idn_data = slave.aoe.read_soe_via_aoe(32, 4)

initialize_slave_net_id()

def initialize_slave_net_id(self, net_id: bytes) -> bool

初始化从站 AoE Net ID(ETG.1020 §9.4)。在 INIT→PreOp 状态切换期间调用。

参数:

  • net_id (bytes) — 6 字节的 AMS Net ID

返回值:

  • bool — 成功返回 True

异常:

  • ValueError — Net ID 不是 6 字节

示例:

slave.aoe.initialize_slave_net_id(bytes([5, 80, 187, 177, 1, 1]))

高级订阅管理

AoESubscriptionManager 类封装了完整的订阅管理功能,支持按 ID 管理订阅。

AoETransmissionMode

class AoETransmissionMode(IntEnum):
NO_TRANSMISSION = 0 # 无通知
CYCLIC = 1 # 循环通知 - 按指定周期发送
ON_CHANGE = 2 # 变化通知 - 数据变化时发送
CYCLIC_IN_DEVICE = 3 # 设备端循环通知
ON_CHANGE_IN_DEVICE = 4 # 设备端变化通知

AoESubscription

@dataclass
class AoESubscription:
handle: int # 通知句柄
slave_index: int # 从站索引
index_group: int # 索引组
index_offset: int # 索引偏移
length: int # 数据长度
callback: Optional[Callable] # 回调函数
subscription_id: str # 唯一订阅 ID (UUID)
trans_mode: int # 传输模式
is_active: bool # 是否活跃

示例:

from ethercat import AoESubscriptionManager, AoETransmissionMode

mgr = AoESubscriptionManager(slave.aoe)

# 订阅数据变化通知
sub_id = mgr.subscribe(
index_group=0x4020, index_offset=0, length=4,
trans_mode=AoETransmissionMode.ON_CHANGE,
cycle_time_ms=100,
callback=lambda data: print(f"数据变化: {data.hex()}")
)

# 获取活跃订阅列表
active = mgr.get_active_subscriptions()
print(f"活跃订阅数: {mgr.active_count}")

# 按 ID 取消订阅
mgr.unsubscribe(sub_id)

# 取消所有订阅
mgr.unsubscribe_all()

订阅管理器属性

属性类型说明
active_countint当前活跃订阅数量
all_handlesList[int]所有活跃通知句柄列表

按句柄 / ID 查询订阅

def get_subscription(self, handle: int) -> Optional[AoESubscription]
def get_subscription_by_id(self, subscription_id: str) -> Optional[AoESubscription]
def unsubscribe_by_handle(self, handle: int) -> bool

get_subscription() 兼容旧句柄风格的查询; get_subscription_by_id() 推荐用于新代码 (UUID 不会与已删除的句柄冲突)。

sub = mgr.get_subscription_by_id(sub_id)
if sub and sub.is_active:
print(f"订阅 slave={sub.slave_index} group=0x{sub.index_group:08X}")

异步订阅与读写 (asyncio)

AoE 邮箱往返延迟通常 10-50 ms, 同步调用会阻塞事件循环。SDK 在专用线程池上提供异步版本, 对齐 C# *Async 接口。

subscribe_async() / unsubscribe_async() / unsubscribe_all_async()

async def subscribe_async(self, index_group: int, index_offset: int,
length: int,
trans_mode: int = AoETransmissionMode.ON_CHANGE,
max_delay_ms: int = 0, cycle_time_ms: int = 100,
callback: Optional[Callable[[bytes], None]] = None,
timeout_ms: Optional[int] = None) -> Optional[str]

async def unsubscribe_async(self, subscription_id: str,
timeout_ms: Optional[int] = None) -> bool

async def unsubscribe_all_async(self,
timeout_ms: Optional[int] = None) -> int

订阅 / 取消订阅的异步版本。timeout_ms 触发 asyncio.TimeoutError, 后台线程仍会跑完单次同步调用。

import asyncio
from ethercat import AoESubscriptionManager, AoETransmissionMode

async def main():
mgr = AoESubscriptionManager(slave.aoe)
sub_id = await mgr.subscribe_async(
index_group=0x4020, index_offset=0, length=4,
trans_mode=AoETransmissionMode.ON_CHANGE,
callback=lambda data: print(f"通知: {data.hex()}"),
timeout_ms=2000,
)
if sub_id is None:
print("订阅失败")
return

await asyncio.sleep(10)
await mgr.unsubscribe_async(sub_id)

asyncio.run(main())

read_async() / write_async() / read_write_async()

from ethercat.slave.aoe import read_async, write_async, read_write_async

async def main():
data = await read_async(slave.aoe, 0x4020, 0, 4, timeout_ms=500)
if data is not None:
print(f"值: {data.hex()}")
await write_async(slave.aoe, 0x4020, 0, b'\x06\x00\x00\x00',
timeout_ms=500)

read_write_async() 优先调用底层 read_write (一次邮箱往返完成读写), 不可用时降级为先 write_asyncread_async

shutdown_aoe_async_executor()

from ethercat.slave.aoe import shutdown_aoe_async_executor

shutdown_aoe_async_executor(wait=True)

进程退出前关闭 AoE 协议专用线程池, 等待 (wait=True) 或不等待已提交任务完成。

完整示例

数据读写

if slave.aoe is not None:
import struct

# 设备信息
info = slave.aoe.read_device_info()
if info:
print(f"设备: {info['name']} v{info['major']}.{info['minor']}.{info['build']}")

# 状态
result = slave.aoe.read_state()
if result:
ads_state, device_state = result
name = slave.aoe.get_ads_state_name(ads_state)
print(f"ADS 状态: {name} ({ads_state}), 设备状态: {device_state}")

# 读取数据
data = slave.aoe.read(0x4020, 0, 4)
if data:
print(f"值: 0x{struct.unpack('<I', data)[0]:08X}")

# 写入数据
slave.aoe.write(0x4020, 0, struct.pack('<I', 0x0006))

# 切换设备状态
slave.aoe.write_control(5, 0) # 切换到 Run 状态

数据订阅

from ethercat import AoESubscriptionManager, AoETransmissionMode

if slave.aoe is not None:
mgr = AoESubscriptionManager(slave.aoe)

sub_id = mgr.subscribe(
index_group=0x4020, index_offset=0, length=4,
trans_mode=AoETransmissionMode.ON_CHANGE,
cycle_time_ms=100,
callback=lambda data: print(f"数据变化: {data.hex()}")
)

import time
time.sleep(10)
mgr.unsubscribe_all()

跨协议网关

if slave.aoe is not None:
import struct

# 通过 AoE 读取 CoE 对象 0x6041:0(状态字)
data = slave.aoe.read_coe_via_aoe(0x6041, 0, 2)
if data is not None:
status_word = struct.unpack('<H', data)[0]
print(f"状态字: 0x{status_word:04X}")

# 通过 AoE 写入 CoE 对象 0x6040:0(控制字)
slave.aoe.write_coe_via_aoe(0x6040, 0, struct.pack('<H', 0x000F))

# 通过 AoE 读取 SoE IDN
idn_data = slave.aoe.read_soe_via_aoe(32, 4)
if idn_data is not None:
print(f"IDN 32 值: {struct.unpack('<I', idn_data)[0]}")