跳到主要内容

VoE (Vendor over EtherCAT)

VoE 协议用于厂商自定义数据传输,支持 ETG.1000.6 Mailbox 通信规范(Mailbox Type 0x0F)。每个厂商可以定义自己的 VoE 头格式和数据结构。

通过 slave.voe 访问。从站不支持 VoE 时为 None

属性

属性类型访问说明
default_timeout_msint读写默认超时时间(毫秒),默认 500
voe_header_sizeint只读VoE 头部大小(6 字节:VendorID 4 + VendorType 2)

基本操作

send()

def send(self, vendor_id: int, vendor_type: int, data: bytes,
timeout_ms: Optional[int] = None) -> bool

发送 VoE 数据到从站。

示例:

slave.voe.send(0x00000002, 0x0001, bytes([0x01, 0x02, 0x03]))

receive()

def receive(self, timeout_ms: Optional[int] = None) -> Optional[VoEResponse]

从从站接收 VoE 数据。

send_and_receive()

def send_and_receive(self, vendor_id: int, vendor_type: int, data: bytes,
timeout_ms: Optional[int] = None) -> Optional[VoEResponse]

发送 VoE 数据并等待响应。

VoEResponse 数据类:

@dataclass
class VoEResponse:
vendor_id: int # 厂商 ID
vendor_type: int # 厂商类型
data: bytes # 响应数据
data_length: int # 数据长度

def to_hex_string(self) -> str:
"""数据格式化为十六进制字符串(如 "01 02 03")"""
...

示例:

response = slave.voe.send_and_receive(0x00000002, 0x0001, bytes([0x10]))
if response is not None:
print(f"厂商ID: 0x{response.vendor_id:08X}")
print(f"数据: {response.to_hex_string()}")

原始帧操作

send_raw()

def send_raw(self, frame_data: bytes, timeout_ms: Optional[int] = None) -> bool

发送 VoE 原始帧(用户自行组织帧格式,包括 VoE 头)。

参数:

  • frame_data (bytes) — 完整的 VoE 帧数据(包含 VoE 头 6 字节 + 数据)
  • timeout_ms (int, 可选) — 超时时间(毫秒),默认使用 default_timeout_ms

返回值:

  • bool — 成功返回 True

receive_raw()

def receive_raw(self, timeout_ms: Optional[int] = None) -> Optional[bytes]

接收 VoE 原始帧。

返回值:

  • bytes | None — 原始帧数据,失败返回 None

send_raw_and_receive()

def send_raw_and_receive(self, frame_data: bytes, timeout_ms: Optional[int] = None) -> Optional[bytes]

发送原始帧并等待响应。

返回值:

  • bytes | None — 响应的原始帧数据,失败返回 None

示例:

# 手动构建 VoE 帧
import struct
frame = struct.pack('<IH', 0x00000002, 0x0001) + bytes([0x10, 0x20])
raw_resp = slave.voe.send_raw_and_receive(frame)
if raw_resp is not None:
print(f"原始响应: {raw_resp.hex()}")

辅助方法

build_voe_frame()

def build_voe_frame(self, vendor_id: int, vendor_type: int, data: Optional[bytes] = None) -> bytes

构建标准 VoE 帧(VoE 头 + 数据)。

参数:

  • vendor_id (int) — 厂商 ID (4 字节)
  • vendor_type (int) — 厂商类型 (2 字节)
  • data (bytes, 可选) — 数据载荷

返回值:

  • bytes — 构建的 VoE 帧

parse_voe_frame()

def parse_voe_frame(self, frame: bytes) -> Optional[VoEResponse]

解析 VoE 帧头部。

参数:

  • frame (bytes) — 完整的 VoE 帧

返回值:

  • VoEResponse | None — 解析后的 VoE 响应对象,解析失败返回 None

示例:

# 构建帧
frame = slave.voe.build_voe_frame(0x00000002, 0x0001, bytes([0x10]))

# 解析帧
parsed = slave.voe.parse_voe_frame(frame)
if parsed is not None:
print(f"厂商ID: 0x{parsed.vendor_id:08X}")
print(f"类型: 0x{parsed.vendor_type:04X}")
print(f"数据: {parsed.to_hex_string()}")

完整示例

厂商命令交互

if slave.voe is not None:
response = slave.voe.send_and_receive(
vendor_id=0x00000002,
vendor_type=0x0001,
data=bytes([0x01, 0x00])
)

if response is not None:
print(str(response))

批量数据传输

if slave.voe is not None:
payload = bytes(1024)
slave.voe.send(0x00000002, 0x1000, payload)

reply = slave.voe.receive()
if reply is not None:
print(f"收到 {reply.data_length} 字节响应")

厂商通知监听 (Notification Listener)

部分厂商通过 VoE 邮箱主动推送状态变化, SDK 在 native 层提供常驻监听线程, 通过 start_notification_listener() 订阅本从站的全部 vendor 通知, 收到时通过 Python 回调分发。

VoENotificationEventArgs

from ethercat.slave.voe import VoENotificationEventArgs

class VoENotificationEventArgs:
slave_index: int # 触发通知的从站编号
vendor_id: int # 厂商 ID (0x00000002 = Beckhoff)
vendor_type: int # 厂商命令类型
data: bytes # 通知载荷 (可能为空)
timestamp: float # time.time() 时间戳

start_notification_listener()

def start_notification_listener(self) -> bool

启动 master 监听线程 + 订阅本从站的全部 vendor 通知 (对齐 C# StartNotificationListener)。重入安全, 重复调用幂等。

返回值:

  • True — 启动成功; 通知会通过 add_notification_listener() 注册的回调分发
  • False — DLL 未导出 VOEStartNotificationListener / VOERegisterNotification, 或 native 注册失败

stop_notification_listener()

def stop_notification_listener(self) -> None

注销本从站订阅 (对齐 C# StopNotificationListener)。关闭 master 全局监听线程, 同 master 上其他从站仍可继续订阅。

stop_all_listeners()

def stop_all_listeners(self) -> None

注销当前 master 上的所有订阅 + 关闭 master 监听线程 (对齐 C# StopAllListeners)。

is_listener_running

@property
def is_listener_running(self) -> bool

监听线程是否正在运行 (DLL 全局状态, 任何已订阅的 VoE 实例返回相同结果)。

add_notification_listener() / remove_notification_listener()

def add_notification_listener(self, listener: Callable[[VoENotificationEventArgs], None]) -> None
def remove_notification_listener(self, listener: Callable[[VoENotificationEventArgs], None]) -> None

注册 / 注销 Python 端的通知回调。允许多个回调同时存在, 单个回调异常不会影响其他回调。

示例:

from ethercat.slave.voe import VoENotificationEventArgs

def on_voe_notify(args: VoENotificationEventArgs):
print(f"[VoE] slave={args.slave_index} "
f"vendor=0x{args.vendor_id:08X} "
f"type=0x{args.vendor_type:04X} "
f"data={args.data.hex()}")

slave.voe.add_notification_listener(on_voe_notify)
if not slave.voe.start_notification_listener():
print("启动监听失败")
else:
print(f"监听运行中: {slave.voe.is_listener_running}")
# ... 接收通知 ...
slave.voe.stop_notification_listener()
slave.voe.remove_notification_listener(on_voe_notify)
回调跑在 native 监听线程

通知回调跑在 DLL 监听线程, 异常会被 SDK 吞掉避免崩溃, 但耗时操作会拖慢通知分发。建议在回调内仅做轻量记录 / 入队, 真正处理转入业务线程。