跳到主要内容

PDO 输入输出

通过 slave.pdo 访问 SlavePdo 实例(slave.pdo 属性提供 PDO 数据读写接口)。

PDO 周期与邮箱的关系

邮箱通信自动附带在 PDO 周期中,不占用额外帧。邮箱响应延迟约 2-3 个 PDO 周期。

推荐

高性能场景使用 ctypes 结构体映射(零拷贝),快速原型使用 类型化读写

ctypes 结构体映射(零拷贝)

bind_pdo_struct()

def bind_pdo_struct(self, struct_type, is_input: bool = True) -> Optional[ctypes.Structure]

将 PDO 映射绑定到用户 ctypes.Structure(零拷贝,直接指向 IOmap 内存)。

参数:

  • struct_type — ctypes.Structure 子类
  • is_input (bool) — True=绑定输入数据(只读),False=绑定输出数据(可写)

返回值:

  • 指向 IOmap 的 struct_type 实例,失败返回 None

说明: 结构体大小必须不超过对应 PDO 数据区大小。返回的实例直接指向 IOmap 内存,读写操作零拷贝。

示例(输入结构体):

import ctypes

class ServoInput(ctypes.Structure):
_pack_ = 1
_fields_ = [
("status_word", ctypes.c_uint16),
("actual_position", ctypes.c_int32),
("actual_velocity", ctypes.c_int32),
]

input_ref = slave.pdo.bind_pdo_struct(ServoInput, is_input=True)
if input_ref:
print(f"当前位置: {input_ref.actual_position}")

示例(输出结构体):

class ServoOutput(ctypes.Structure):
_pack_ = 1
_fields_ = [
("control_word", ctypes.c_uint16),
("target_position", ctypes.c_int32),
("target_velocity", ctypes.c_int32),
]

output_ref = slave.pdo.bind_pdo_struct(ServoOutput, is_input=False)
if output_ref:
output_ref.control_word = 0x000F
output_ref.target_position = 10000
ctypes 结构体定义注意事项
  • 必须设置 _pack_ = 1 以匹配 EtherCAT 数据紧密排列
  • 字段顺序必须与 PDO 映射顺序一致
  • 结构体大小必须与 PDO 数据区大小匹配(可通过 slave.ibytes / slave.obytes 查看)

字节数组访问

inputs / outputs

@property
def inputs(self) -> Optional[bytes] # 只读,输入 PDO 原始字节

@property
def outputs(self) -> Optional[bytes] # 只读,输出 PDO 原始字节

def write_outputs(self, data: bytes) -> bool # 写入输出 PDO 字节

读取 PDO 字节数组。每次访问从 IOmap 读取最新数据。写入使用 write_outputs() 方法。

示例:

# 读取输入 PDO
raw_input = slave.pdo.inputs
if raw_input:
import struct
status_word = struct.unpack_from('<H', raw_input, 0)[0]
print(f"StatusWord = 0x{status_word:04X}")

# 读取输出 PDO
raw_output = slave.pdo.outputs
if raw_output:
print(f"当前输出: {raw_output.hex()}")

# 写入输出 PDO
slave.pdo.write_outputs(bytes([0x0F, 0x00, 0x10, 0x27, 0x00, 0x00]))

直接读写

提示

以下方法适用于需要精细控制偏移和长度的场景。

read_input_data_direct() / write_output_data_direct()

def read_input_data_direct(self, buffer: bytearray, offset: int = 0, length: int = 0) -> int
def write_output_data_direct(self, data: bytes, offset: int = 0, length: int = 0) -> int

直接从 IOmap 读取输入数据 / 写入输出数据,支持指定缓冲区偏移和字节数。

参数:

  • buffer / data — 目标/源缓冲区
  • offset (int) — 缓冲区偏移(默认 0)
  • length (int) — 要读/写的字节数(0 = 全部)

返回值:

  • int — 实际读取/写入的字节数

示例:

# 读取输入数据到缓冲区
input_buffer = bytearray(slave.ibytes)
bytes_read = slave.pdo.read_input_data_direct(input_buffer)

# 写入输出数据
output_data = bytes([0x0F, 0x00, 0x00, 0x00])
bytes_written = slave.pdo.write_output_data_direct(output_data)

原始指针访问

注意

以下 API 直接操作内存指针,仅适用于极致性能场景。指针在 IOmap 重新映射后会失效,请确保仅在 PDO 循环期间使用。

get_input_data_pointer() / get_output_data_pointer()

def get_input_data_pointer(self) -> int
def get_output_data_pointer(self) -> int

获取输入/输出数据的直接指针(零拷贝访问)。失败时返回 0

示例:

import ctypes

input_ptr = slave.pdo.get_input_data_pointer()
if input_ptr:
# 通过 ctypes 读取数据
status_word = ctypes.cast(input_ptr, ctypes.POINTER(ctypes.c_uint16))[0]
print(f"StatusWord = 0x{status_word:04X}")

memoryview 访问

def get_input_memoryview(self) -> memoryview   # 输入 PDO 零拷贝视图
def get_output_memoryview(self) -> memoryview # 输出 PDO 零拷贝视图

获取 PDO 数据的 memoryview(零拷贝访问)。仅在 PDO 循环中使用,IOmap 重新映射后视图会失效。

示例:

mv_in = slave.pdo.get_input_memoryview()
if len(mv_in) > 0:
import struct
status_word = struct.unpack_from('<H', mv_in, 0)[0]
print(f"StatusWord = 0x{status_word:04X}")

mv_out = slave.pdo.get_output_memoryview()
if len(mv_out) > 0:
mv_out[0] = 0xFF # 直接写入

PDO 映射信息查询

@property
def inputs_mapping_info(self) -> dict # {'pointer': int, 'size': int, 'slave_index': int}

@property
def outputs_mapping_info(self) -> dict # {'pointer': int, 'size': int, 'slave_index': int}

获取输入/输出 PDO 映射信息,包含指针地址、数据大小和从站索引。

类型化读写

读取输入 PDO

SlavePdo 提供类型化读取方法,按偏移读取输入 PDO 数据:

# 按类型读取输入 PDO(offset 为字节偏移量)
status_word = slave.pdo.read_uint16(0) # 偏移 0, 无符号 16 位
actual_pos = slave.pdo.read_int32(2) # 偏移 2, 有符号 32 位
temperature = slave.pdo.read_float(6) # 偏移 6, 32 位浮点
digital_inputs = slave.pdo.read_uint8(10) # 偏移 10, 无符号 8 位
velocity = slave.pdo.read_int64(11) # 偏移 11, 有符号 64 位
name = slave.pdo.read_string(19, 16) # 偏移 19, 最大 16 字符

写入输出 PDO

类型化写入方法,按偏移写入输出 PDO 数据:

# 按类型写入输出 PDO(offset 为字节偏移量)
slave.pdo.write_uint16(0, 0x000F) # ControlWord
slave.pdo.write_int32(2, 100000) # TargetPosition
slave.pdo.write_float(6, 3.14) # 浮点参数
slave.pdo.write_uint8(10, 0xFF) # 数字输出
slave.pdo.write_int64(11, 999999) # 64 位整数
slave.pdo.write_string(19, "hello", 16) # 字符串 (含 null 填充)

支持的类型

  • read_uint8(offset) / write_uint8(offset, val) — 无符号 8 位 (1 字节)
  • read_int8(offset) / write_int8(offset, val) — 有符号 8 位 (1 字节)
  • read_uint16(offset) / write_uint16(offset, val) — 无符号 16 位 (2 字节)
  • read_int16(offset) / write_int16(offset, val) — 有符号 16 位 (2 字节)
  • read_uint32(offset) / write_uint32(offset, val) — 无符号 32 位 (4 字节)
  • read_int32(offset) / write_int32(offset, val) — 有符号 32 位 (4 字节)
  • read_uint64(offset) / write_uint64(offset, val) — 无符号 64 位 (8 字节)
  • read_int64(offset) / write_int64(offset, val) — 有符号 64 位 (8 字节)
  • read_float(offset) / write_float(offset, val) — 32 位浮点 (4 字节)
  • read_double(offset) / write_double(offset, val) — 64 位双精度 (8 字节)
  • read_string(offset, len) / write_string(offset, val, len) — 字符串 (可变长度)

批量读写

batch_read() / batch_write()

def batch_read(self, requests: list) -> list
def batch_write(self, requests: list) -> int

批量读写 PDO 数据,减少多次调用的开销。

参数:

  • batch_read: requests[(offset, size), ...] 列表,返回 [bytes | None, ...]
  • batch_write: requests[(offset, data), ...] 列表,返回成功写入的项数量

示例:

# 批量读取
results = slave.pdo.batch_read([
(0, 2), # 偏移 0, 读 2 字节 (StatusWord)
(2, 4), # 偏移 2, 读 4 字节 (ActualPosition)
(6, 4), # 偏移 6, 读 4 字节 (ActualVelocity)
])
for data in results:
if data:
print(data.hex())

# 批量写入
count = slave.pdo.batch_write([
(0, struct.pack('<H', 0x000F)), # ControlWord
(2, struct.pack('<i', 100000)), # TargetPosition
])
print(f"成功写入 {count} 项")

PDOArrayInstance(索引访问)

通过 PDOArrayInstance 按字节索引访问 PDO 数据,支持类型化属性。

from ethercat.slave.pdo import PDOArrayInstance

# 获取输入/输出数据指针后创建实例
in_ptr = slave.pdo.get_input_data_pointer()
out_ptr = slave.pdo.get_output_data_pointer()

if in_ptr:
pdo_in = PDOArrayInstance(in_ptr, slave.ibytes, is_input=True)
item = pdo_in[0] # 获取 PDODataItem
value = item.content # 读取字节值 (0-255)
status = item.as_uint16 # 读取 uint16
pos = pdo_in[2].as_int32 # 读取偏移 2 处的 int32

if out_ptr:
pdo_out = PDOArrayInstance(out_ptr, slave.obytes, is_input=False)
pdo_out[0].content = 0x0F # 写入字节值
pdo_out[0].as_uint16 = 0x000F # 写入 uint16
pdo_out[2].as_int32 = 100000 # 写入 int32

PDODataItem 类型化属性

  • content (int 0-255) — 字节值读写
  • as_int16 (int) — 有符号 16 位
  • as_uint16 (int) — 无符号 16 位
  • as_int32 (int) — 有符号 32 位
  • as_uint32 (int) — 无符号 32 位
  • as_float (float) — 32 位浮点
  • as_double (float) — 64 位双精度

位操作方法:

  • get_bit(bit_index) — 获取指定位值(0-7)
  • set_bit(bit_index, value) — 设置指定位值(仅输出 PDO)

完整示例

import ctypes
from ethercat import DcSyncMode, EcState

class ServoInput(ctypes.Structure):
_pack_ = 1
_fields_ = [
("status_word", ctypes.c_uint16),
("actual_position", ctypes.c_int32),
("actual_velocity", ctypes.c_int32),
]

class ServoOutput(ctypes.Structure):
_pack_ = 1
_fields_ = [
("control_word", ctypes.c_uint16),
("target_position", ctypes.c_int32),
]

slave = master[1] # 1-based 索引

# ===== ctypes 结构体映射(推荐,零拷贝) =====
input_ref = slave.pdo.bind_pdo_struct(ServoInput, is_input=True)
output_ref = slave.pdo.bind_pdo_struct(ServoOutput, is_input=False)

if input_ref:
print(f"状态字: 0x{input_ref.status_word:04X}")
print(f"当前位置: {input_ref.actual_position}")
if output_ref:
output_ref.control_word = 0x000F
output_ref.target_position = 10000

# ===== 类型化读写 =====
status = slave.pdo.read_uint16(0)
position = slave.pdo.read_int32(2)
slave.pdo.write_uint16(0, 0x000F)
slave.pdo.write_int32(2, 100000)

# ===== 字节数组 =====
raw_input = slave.pdo.inputs
slave.pdo.write_outputs(bytes([0x0F, 0x00, 0x10, 0x27, 0x00, 0x00]))

# ===== memoryview 零拷贝 =====
mv = slave.pdo.get_input_memoryview()
if len(mv) >= 2:
import struct
sw = struct.unpack_from('<H', mv, 0)[0]
print(f"StatusWord (memoryview) = 0x{sw:04X}")

# ===== 直接读写 =====
buf = bytearray(slave.ibytes)
slave.pdo.read_input_data_direct(buf)

# ===== 批量操作 =====
results = slave.pdo.batch_read([(0, 2), (2, 4)])