AoE (ADS over EtherCAT)
AoE 协议实现了 ADS (Automation Device Specification) over EtherCAT 通信,支持 Beckhoff TwinCAT 和类似设备的 ADS 通信。
通过 slave.aoe 访问。从站不支持 AoE 时为 None。
设备信息
read_device_info()
def read_device_info(self) -> Optional[dict]
读取 ADS 设备信息。
返回值:
dict | None— 设备信息字典,失败返回None
{
'major': int, # 主版本号
'minor': int, # 次版本号
'build': int, # 构建号
'name': str, # 设备名称
}
示例:
info = slave.aoe.read_device_info()
if info is not None:
print(f"设备: {info['name']} v{info['major']}.{info['minor']}.{info['build']}")
read_state()
def read_state(self) -> Optional[Tuple[int, int]]
读取 ADS 状态,返回 (ads_state, device_state) 元组,失败返回 None。
相关结构:
class AdsState(IntEnum):
Invalid = 0 # 无效状态
Idle = 1 # 空闲
Reset = 2 # 复位
Init = 3 # 初始化
Start = 4 # 启动
Run = 5 # 运行
Stop = 6 # 停止
SaveConfig = 7 # 保存配置
LoadConfig = 8 # 加载配置
PowerFailure = 9 # 电源故障
PowerGood = 10 # 电源正常
Error = 11 # 错误
Shutdown = 12 # 关闭
Suspend = 13 # 挂起
Resume = 14 # 恢复
Config = 15 # 配置
Reconfig = 16 # 重新配置
示例:
result = slave.aoe.read_state()
if result is not None:
ads_state, device_state = result
print(f"ADS 状态: {ads_state}, 设备状态: {device_state}")
write_control()
def write_control(self, ads_state: int, device_state: int,
data: bytes = b'') -> bool
写入控制命令(切换设备状态)。
get_ads_state_name()
def get_ads_state_name(self, ads_state: int) -> str
获取 ADS 状态名称。
示例:
result = slave.aoe.read_state()
if result:
ads_state, device_state = result
name = slave.aoe.get_ads_state_name(ads_state)
print(f"ADS 状态: {name} ({ads_state})")
数据读写
read()
def read(self, index_group: int, index_offset: int,
length: int) -> Optional[bytes]
读取 ADS 数据。
参数:
index_group(int) — 索引组index_offset(int) — 索引偏移length(int) — 读取长度
返回值:
bytes | None— 读取的数据,失败返回None
示例:
data = slave.aoe.read(0x4020, 0, 4)
if data is not None:
import struct
value = struct.unpack('<I', data)[0]
print(f"状态值: 0x{value:08X}")
write()
def write(self, index_group: int, index_offset: int,
data: bytes) -> bool
写入 ADS 数据。
参数:
index_group(int) — 索引组index_offset(int) — 索引偏移data(bytes) — 写入数据
返回值:
bool— 成功返回True
示例:
import struct
slave.aoe.write(0x4020, 0, struct.pack('<I', 0x0006))
read_write()
def read_write(self, index_group: int, index_offset: int,
read_length: int, write_data: bytes = b'') -> Optional[bytes]
ADS 读写操作(同时读写数据)。
示例:
# 读取数据
data = slave.aoe.read_write(0x4020, 0, 4)
if data is not None:
import struct
value = struct.unpack('<I', data)[0]
print(f"值: 0x{value:08X}")
# 写入数据(read_length=0 表示仅写入)
slave.aoe.read_write(0x4020, 0, 0, struct.pack('<I', 0x0006))
send_command()
def send_command(self, target_port: int, command_id: int,
command_data: bytes = b'') -> Optional[bytes]
发送 AoE 命令并接收响应。
参数:
target_port(int) — 目标 AMS 端口command_id(int) — ADS 命令 IDcommand_data(bytes) — 命令数据
返回值:
bytes | None— 响应数据,失败返回None
AoEResultCode 枚举
ADS / AoE 操作的标准结果错误码 (ETG.1020 Table 16). 数值与 C/C++/C#/Java/Rust SDK 完全一致, 高字节通常表示错误类 (0x0700 ADS device, 0x0740 ADS client).
from ethercat.slave.aoe import AoEResultCode
| 名称 | 值 | 说明 |
|---|---|---|
| NO_ERROR | 0x0000 | 无错误 |
| INTERNAL_ERROR | 0x0001 | 内部错误 |
| NO_REALTIME | 0x0002 | 无实时上下文 |
| ALLOCATION_LOCKED_MEMORY_ERROR | 0x0003 | 锁定内存分配失败 |
| INSERT_MAILBOX_ERROR | 0x0004 | 邮箱已满 |
| WRONG_RECEIVE_HMSG | 0x0005 | 命令 ID 不匹配 |
| TARGET_PORT_NOT_FOUND | 0x0006 | 目标 AMS 端口未找到 |
| TARGET_MACHINE_NOT_FOUND | 0x0007 | 目标设备未找到 |
| UNKNOWN_COMMAND_ID | 0x0008 | 未知命令 ID |
| BAD_TASK_ID | 0x0009 | 无效 Task ID |
| NO_IO | 0x000A | 无 IO 通道 |
| UNKNOWN_AMS_COMMAND | 0x000B | 未知 AMS 命令 |
| WIN32_ERROR | 0x000C | Win32 错误 |
| PORT_NOT_CONNECTED | 0x000D | 端口未连接 |
| INVALID_AMS_LENGTH | 0x000E | 无效 AMS 长度 |
| INVALID_AMS_NET_ID | 0x000F | 无效 AMS Net ID |
| AMS_SYNC_TIMEOUT | 0x0015 | AMS 同步超时 |
| HOST_UNREACHABLE | 0x001B | 主机不可达 |
| WSA_SERVICE_NOT_STARTED | 0x274D | 设备服务未启动 |
| DEVICE_INVALID_GROUP | 0x0700 | 设备无效 IndexGroup |
| DEVICE_INVALID_OFFSET | 0x0701 | 设备无效 IndexOffset |
| DEVICE_INVALID_ACCESS | 0x0702 | 设备访问无效 |
| DEVICE_INVALID_SIZE | 0x0703 | 设备数据大小无效 |
| DEVICE_NOT_READY | 0x0705 | 设备未就绪 |
| DEVICE_BUSY | 0x0706 | 设备忙 |
| DEVICE_NOT_FOUND | 0x0708 | 设备不存在 |
| DEVICE_INVALID_STATE | 0x070C | 设备状态无效 |
| DEVICE_TIMEOUT | 0x0712 | 设备超时 |
| DEVICE_ACCESS_DENIED | 0x071C | 设备访问被拒绝 |
| CLIENT_INVALID_PARAM | 0x0741 | 客户端参数无效 |
| CLIENT_INVOKE_TIMEOUT | 0x0744 | 客户端调用超时 |
| CLIENT_PORT_NOT_OPEN | 0x0745 | 客户端端口未打开 |
完整 70+ 项错误码请参见枚举源码; 上表仅列常用条目。
示例:
from ethercat.slave.aoe import AoEResultCode
result = slave.aoe.read_state()
if result is None:
# 通过 read_device_info / read / write 等返回 None 时
# 后续读取 last_result_code 判断 (若 SDK 暴露) 或直接对照 AoEResultCode
print(f"AoE 失败, 错误码可能为 {AoEResultCode.DEVICE_NOT_READY.name}")
AoE 通知
add_notification()
def add_notification(self, index_group: int, index_offset: int,
length: int, trans_mode: int = 0,
max_delay: int = 0, cycle_time: int = 0) -> Optional[int]
添加 ADS 通知,返回通知句柄。
del_notification()
def del_notification(self, handle: int) -> bool
删除 ADS 通知。
aoe.py 模块中提供 AoESubscriptionManager 类,封装了更便捷的订阅管理功能:
from ethercat import AoESubscriptionManager
mgr = AoESubscriptionManager(slave.aoe)
handle = mgr.subscribe(0x4020, 0, 4, callback=on_data_changed)
mgr.unsubscribe(handle)
mgr.unsubscribe_all()
路由配置
set_config()
def set_config(self, target_net_id: bytes, target_port: int,
source_net_id: bytes, source_port: int) -> bool
设置 AoE 路由配置。NetID 为 6 字节。
get_config()
def get_config(self) -> Optional[dict]
获取 AoE 路由配置,返回字典:
{
'target_net_id': bytes, # 目标 NetID (6 字节)
'target_port': int, # 目标端口
'source_net_id': bytes, # 源 NetID (6 字节)
'source_port': int, # 源端口
}
跨协议网关
通过 AoE 路由访问其他邮箱协议(ETG.1020),支持 CoE 和 SoE 协议的透明转发。
read_coe_via_aoe()
def read_coe_via_aoe(self, index: int, subindex: int,
read_length: int) -> Optional[bytes]
通过 AoE 路由读取 CoE 对象(IndexGroup=0xF302,IndexOffset=(index << 16) | subindex)。
参数:
index(int) — CoE 对象索引subindex(int) — CoE 子索引read_length(int) — 期望读取长度
返回值:
bytes | None— 读取的数据,失败返回None
write_coe_via_aoe()
def write_coe_via_aoe(self, index: int, subindex: int,
data: bytes) -> bool
通过 AoE 路由写入 CoE 对象(IndexGroup=0xF302)。
read_soe_via_aoe()
def read_soe_via_aoe(self, idn: int, read_length: int) -> Optional[bytes]
通过 AoE 路由读取 SoE IDN(IndexGroup=0xF420,IndexOffset=IDN 编号)。
write_soe_via_aoe()
def write_soe_via_aoe(self, idn: int, data: bytes) -> bool
通过 AoE 路由写入 SoE IDN(IndexGroup=0xF420)。
示例:
# 通过 AoE 读取 CoE 对象 0x6041:0(状态字)
data = slave.aoe.read_coe_via_aoe(0x6041, 0, 2)
if data is not None:
import struct
status_word = struct.unpack('<H', data)[0]
print(f"状态字: 0x{status_word:04X}")
# 通过 AoE 写入 CoE 对象 0x6040:0(控制字)
slave.aoe.write_coe_via_aoe(0x6040, 0, struct.pack('<H', 0x000F))
# 通过 AoE 读取 SoE IDN
idn_data = slave.aoe.read_soe_via_aoe(32, 4)
initialize_slave_net_id()
def initialize_slave_net_id(self, net_id: bytes) -> bool
初始化从站 AoE Net ID(ETG.1020 §9.4)。在 INIT→PreOp 状态切换期间调用。
参数:
net_id(bytes) — 6 字节的 AMS Net ID
返回值:
bool— 成功返回True
异常:
ValueError— Net ID 不是 6 字节
示例:
slave.aoe.initialize_slave_net_id(bytes([5, 80, 187, 177, 1, 1]))
高级订阅管理
AoESubscriptionManager 类封装了完整的订阅管理功能,支持按 ID 管理订阅。
AoETransmissionMode
class AoETransmissionMode(IntEnum):
NO_TRANSMISSION = 0 # 无通知
CYCLIC = 1 # 循环通知 - 按指定周期发送
ON_CHANGE = 2 # 变化通知 - 数据变化时发送
CYCLIC_IN_DEVICE = 3 # 设备端循环通知
ON_CHANGE_IN_DEVICE = 4 # 设备端变化通知
AoESubscription
@dataclass
class AoESubscription:
handle: int # 通知句柄
slave_index: int # 从站索引
index_group: int # 索引组
index_offset: int # 索引偏移
length: int # 数据长度
callback: Optional[Callable] # 回调函数
subscription_id: str # 唯一订阅 ID (UUID)
trans_mode: int # 传输模式
is_active: bool # 是否活跃
示例:
from ethercat import AoESubscriptionManager, AoETransmissionMode
mgr = AoESubscriptionManager(slave.aoe)
# 订阅数据变化通知
sub_id = mgr.subscribe(
index_group=0x4020, index_offset=0, length=4,
trans_mode=AoETransmissionMode.ON_CHANGE,
cycle_time_ms=100,
callback=lambda data: print(f"数据变化: {data.hex()}")
)
# 获取活跃订阅列表
active = mgr.get_active_subscriptions()
print(f"活跃订阅数: {mgr.active_count}")
# 按 ID 取消订阅
mgr.unsubscribe(sub_id)
# 取消所有订阅
mgr.unsubscribe_all()
订阅管理器属性
| 属性 | 类型 | 说明 |
|---|---|---|
| active_count | int | 当前活跃订阅数量 |
| all_handles | List[int] | 所有活跃通知句柄列表 |
按句柄 / ID 查询订阅
def get_subscription(self, handle: int) -> Optional[AoESubscription]
def get_subscription_by_id(self, subscription_id: str) -> Optional[AoESubscription]
def unsubscribe_by_handle(self, handle: int) -> bool
get_subscription() 兼容旧句柄风格的查询; get_subscription_by_id() 推荐用于新代码 (UUID 不会与已删除的句柄冲突)。
sub = mgr.get_subscription_by_id(sub_id)
if sub and sub.is_active:
print(f"订阅 slave={sub.slave_index} group=0x{sub.index_group:08X}")
异步订阅与读写 (asyncio)
AoE 邮箱往返延迟通常 10-50 ms, 同步调用会阻塞事件循环。SDK 在专用线程池上提供异步版本, 对齐 C# *Async 接口。
subscribe_async() / unsubscribe_async() / unsubscribe_all_async()
async def subscribe_async(self, index_group: int, index_offset: int,
length: int,
trans_mode: int = AoETransmissionMode.ON_CHANGE,
max_delay_ms: int = 0, cycle_time_ms: int = 100,
callback: Optional[Callable[[bytes], None]] = None,
timeout_ms: Optional[int] = None) -> Optional[str]
async def unsubscribe_async(self, subscription_id: str,
timeout_ms: Optional[int] = None) -> bool
async def unsubscribe_all_async(self,
timeout_ms: Optional[int] = None) -> int
订阅 / 取消订阅的异步版本。timeout_ms 触发 asyncio.TimeoutError, 后台线程仍会跑完单次同步调用。
import asyncio
from ethercat import AoESubscriptionManager, AoETransmissionMode
async def main():
mgr = AoESubscriptionManager(slave.aoe)
sub_id = await mgr.subscribe_async(
index_group=0x4020, index_offset=0, length=4,
trans_mode=AoETransmissionMode.ON_CHANGE,
callback=lambda data: print(f"通知: {data.hex()}"),
timeout_ms=2000,
)
if sub_id is None:
print("订阅失败")
return
await asyncio.sleep(10)
await mgr.unsubscribe_async(sub_id)
asyncio.run(main())
read_async() / write_async() / read_write_async()
from ethercat.slave.aoe import read_async, write_async, read_write_async
async def main():
data = await read_async(slave.aoe, 0x4020, 0, 4, timeout_ms=500)
if data is not None:
print(f"值: {data.hex()}")
await write_async(slave.aoe, 0x4020, 0, b'\x06\x00\x00\x00',
timeout_ms=500)
read_write_async() 优先调用底层 read_write (一次邮箱往返完成读写), 不可用时降级为先 write_async 再 read_async。
shutdown_aoe_async_executor()
from ethercat.slave.aoe import shutdown_aoe_async_executor
shutdown_aoe_async_executor(wait=True)
进程退出前关闭 AoE 协议专用线程池, 等待 (wait=True) 或不等待已提交任务完成。
完整示例
数据读写
if slave.aoe is not None:
import struct
# 设备信息
info = slave.aoe.read_device_info()
if info:
print(f"设备: {info['name']} v{info['major']}.{info['minor']}.{info['build']}")
# 状态
result = slave.aoe.read_state()
if result:
ads_state, device_state = result
name = slave.aoe.get_ads_state_name(ads_state)
print(f"ADS 状态: {name} ({ads_state}), 设备状态: {device_state}")
# 读取数据
data = slave.aoe.read(0x4020, 0, 4)
if data:
print(f"值: 0x{struct.unpack('<I', data)[0]:08X}")
# 写入数据
slave.aoe.write(0x4020, 0, struct.pack('<I', 0x0006))
# 切换设备状态
slave.aoe.write_control(5, 0) # 切换到 Run 状态
数据订阅
from ethercat import AoESubscriptionManager, AoETransmissionMode
if slave.aoe is not None:
mgr = AoESubscriptionManager(slave.aoe)
sub_id = mgr.subscribe(
index_group=0x4020, index_offset=0, length=4,
trans_mode=AoETransmissionMode.ON_CHANGE,
cycle_time_ms=100,
callback=lambda data: print(f"数据变化: {data.hex()}")
)
import time
time.sleep(10)
mgr.unsubscribe_all()
跨协议网关
if slave.aoe is not None:
import struct
# 通过 AoE 读取 CoE 对象 0x6041:0(状态字)
data = slave.aoe.read_coe_via_aoe(0x6041, 0, 2)
if data is not None:
status_word = struct.unpack('<H', data)[0]
print(f"状态字: 0x{status_word:04X}")
# 通过 AoE 写入 CoE 对象 0x6040:0(控制字)
slave.aoe.write_coe_via_aoe(0x6040, 0, struct.pack('<H', 0x000F))
# 通过 AoE 读取 SoE IDN
idn_data = slave.aoe.read_soe_via_aoe(32, 4)
if idn_data is not None:
print(f"IDN 32 值: {struct.unpack('<I', idn_data)[0]}")