VoE (Vendor over EtherCAT)
VoE 协议用于厂商自定义数据传输,支持 ETG.1000.6 Mailbox 通信规范(Mailbox Type 0x0F)。每个厂商可以定义自己的 VoE 头格式和数据结构。
通过 slave.voe 访问。从站不支持 VoE 时为 None。
属性
| 属性 | 类型 | 访问 | 说明 |
|---|---|---|---|
| default_timeout_ms | int | 读写 | 默认超时时间(毫秒),默认 500 |
| voe_header_size | int | 只读 | VoE 头部大小(6 字节:VendorID 4 + VendorType 2) |
基本操作
send()
def send(self, vendor_id: int, vendor_type: int, data: bytes,
timeout_ms: Optional[int] = None) -> bool
发送 VoE 数据到从站。
示例:
slave.voe.send(0x00000002, 0x0001, bytes([0x01, 0x02, 0x03]))
receive()
def receive(self, timeout_ms: Optional[int] = None) -> Optional[VoEResponse]
从从站接收 VoE 数据。
send_and_receive()
def send_and_receive(self, vendor_id: int, vendor_type: int, data: bytes,
timeout_ms: Optional[int] = None) -> Optional[VoEResponse]
发送 VoE 数据并等待响应。
VoEResponse 数据类:
@dataclass
class VoEResponse:
vendor_id: int # 厂商 ID
vendor_type: int # 厂商类型
data: bytes # 响应数据
data_length: int # 数据长度
def to_hex_string(self) -> str:
"""数据格式化为十六进制字符串(如 "01 02 03")"""
...
示例:
response = slave.voe.send_and_receive(0x00000002, 0x0001, bytes([0x10]))
if response is not None:
print(f"厂商ID: 0x{response.vendor_id:08X}")
print(f"数据: {response.to_hex_string()}")
原始帧操作
send_raw()
def send_raw(self, frame_data: bytes, timeout_ms: Optional[int] = None) -> bool
发送 VoE 原始帧(用户自行组织帧格式,包括 VoE 头)。
参数:
frame_data(bytes) — 完整的 VoE 帧数据(包含 VoE 头 6 字节 + 数据)timeout_ms(int, 可选) — 超时时间(毫秒),默认使用default_timeout_ms
返回值:
bool— 成功返回True
receive_raw()
def receive_raw(self, timeout_ms: Optional[int] = None) -> Optional[bytes]
接收 VoE 原始帧。
返回值:
bytes | None— 原始帧数据,失败返回None
send_raw_and_receive()
def send_raw_and_receive(self, frame_data: bytes, timeout_ms: Optional[int] = None) -> Optional[bytes]
发送原始帧并等待响应。
返回值:
bytes | None— 响应的原始帧数据,失败返回None
示例:
# 手动构建 VoE 帧
import struct
frame = struct.pack('<IH', 0x00000002, 0x0001) + bytes([0x10, 0x20])
raw_resp = slave.voe.send_raw_and_receive(frame)
if raw_resp is not None:
print(f"原始响应: {raw_resp.hex()}")
辅助方法
build_voe_frame()
def build_voe_frame(self, vendor_id: int, vendor_type: int, data: Optional[bytes] = None) -> bytes
构建标准 VoE 帧(VoE 头 + 数据)。
参数:
vendor_id(int) — 厂商 ID (4 字节)vendor_type(int) — 厂商类型 (2 字节)data(bytes, 可选) — 数据载荷
返回值:
bytes— 构建的 VoE 帧
parse_voe_frame()
def parse_voe_frame(self, frame: bytes) -> Optional[VoEResponse]
解析 VoE 帧头部。
参数:
frame(bytes) — 完整的 VoE 帧
返回值:
VoEResponse | None— 解析后的 VoE 响应对象,解析失败返回None
示例:
# 构建帧
frame = slave.voe.build_voe_frame(0x00000002, 0x0001, bytes([0x10]))
# 解析帧
parsed = slave.voe.parse_voe_frame(frame)
if parsed is not None:
print(f"厂商ID: 0x{parsed.vendor_id:08X}")
print(f"类型: 0x{parsed.vendor_type:04X}")
print(f"数据: {parsed.to_hex_string()}")
完整示例
厂商命令交互
if slave.voe is not None:
response = slave.voe.send_and_receive(
vendor_id=0x00000002,
vendor_type=0x0001,
data=bytes([0x01, 0x00])
)
if response is not None:
print(str(response))
批量数据传输
if slave.voe is not None:
payload = bytes(1024)
slave.voe.send(0x00000002, 0x1000, payload)
reply = slave.voe.receive()
if reply is not None:
print(f"收到 {reply.data_length} 字节响应")
厂商通知监听 (Notification Listener)
部分厂商通过 VoE 邮箱主动推送状态变化, SDK 在 native 层提供常驻监听线程, 通过 start_notification_listener() 订阅本从站的全部 vendor 通知, 收到时通过 Python 回调分发。
VoENotificationEventArgs
from ethercat.slave.voe import VoENotificationEventArgs
class VoENotificationEventArgs:
slave_index: int # 触发通知的从站编号
vendor_id: int # 厂商 ID (0x00000002 = Beckhoff)
vendor_type: int # 厂商命令类型
data: bytes # 通知载荷 (可能为空)
timestamp: float # time.time() 时间戳
start_notification_listener()
def start_notification_listener(self) -> bool
启动 master 监听线程 + 订阅本从站的全部 vendor 通知 (对齐 C# StartNotificationListener)。重入安全, 重复调用幂等。
返回值:
True— 启动成功; 通知会通过add_notification_listener()注册的回调分发False— DLL 未导出VOEStartNotificationListener/VOERegisterNotification, 或 native 注册失败
stop_notification_listener()
def stop_notification_listener(self) -> None
注销本从站订阅 (对齐 C# StopNotificationListener)。不关闭 master 全局监听线程, 同 master 上其他从站仍可继续订阅。
stop_all_listeners()
def stop_all_listeners(self) -> None
注销当前 master 上的所有订阅 + 关闭 master 监听线程 (对齐 C# StopAllListeners)。
is_listener_running
@property
def is_listener_running(self) -> bool
监听线程是否正在运行 (DLL 全局状态, 任何已订阅的 VoE 实例返回相同结果)。
add_notification_listener() / remove_notification_listener()
def add_notification_listener(self, listener: Callable[[VoENotificationEventArgs], None]) -> None
def remove_notification_listener(self, listener: Callable[[VoENotificationEventArgs], None]) -> None
注册 / 注销 Python 端的通知回调。允许多个回调同时存在, 单个回调异常不会影响其他回调。
示例:
from ethercat.slave.voe import VoENotificationEventArgs
def on_voe_notify(args: VoENotificationEventArgs):
print(f"[VoE] slave={args.slave_index} "
f"vendor=0x{args.vendor_id:08X} "
f"type=0x{args.vendor_type:04X} "
f"data={args.data.hex()}")
slave.voe.add_notification_listener(on_voe_notify)
if not slave.voe.start_notification_listener():
print("启动监听失败")
else:
print(f"监听运行中: {slave.voe.is_listener_running}")
# ... 接收通知 ...
slave.voe.stop_notification_listener()
slave.voe.remove_notification_listener(on_voe_notify)
通知回调跑在 DLL 监听线程, 异常会被 SDK 吞掉避免崩溃, 但耗时操作会拖慢通知分发。建议在回调内仅做轻量记录 / 入队, 真正处理转入业务线程。