PDO 输入输出
通过 slave.pdo 访问 SlavePdo 实例(slave.pdo 属性提供 PDO 数据读写接口)。
PDO 周期与邮箱的关系
邮箱通信自动附带在 PDO 周期中,不占用额外帧。邮箱响应延迟约 2-3 个 PDO 周期。
推荐
高性能场景使用 ctypes 结构体映射(零拷贝),快速原型使用 类型化读写。
ctypes 结构体映射(零拷贝)
bind_pdo_struct()
def bind_pdo_struct(self, struct_type, is_input: bool = True) -> Optional[ctypes.Structure]
将 PDO 映射绑定到用户 ctypes.Structure(零拷贝,直接指向 IOmap 内存)。
参数:
struct_type— ctypes.Structure 子类is_input(bool) —True=绑定输入数据(只读),False=绑定输出数据(可写)
返回值:
- 指向 IOmap 的 struct_type 实例,失败返回
None
说明: 结构体大小必须不超过对应 PDO 数据区大小。返回的实例直接指向 IOmap 内存,读写操作零拷贝。
示例(输入结构体):
import ctypes
class ServoInput(ctypes.Structure):
_pack_ = 1
_fields_ = [
("status_word", ctypes.c_uint16),
("actual_position", ctypes.c_int32),
("actual_velocity", ctypes.c_int32),
]
input_ref = slave.pdo.bind_pdo_struct(ServoInput, is_input=True)
if input_ref:
print(f"当前位置: {input_ref.actual_position}")
示例(输出结构体):
class ServoOutput(ctypes.Structure):
_pack_ = 1
_fields_ = [
("control_word", ctypes.c_uint16),
("target_position", ctypes.c_int32),
("target_velocity", ctypes.c_int32),
]
output_ref = slave.pdo.bind_pdo_struct(ServoOutput, is_input=False)
if output_ref:
output_ref.control_word = 0x000F
output_ref.target_position = 10000
ctypes 结构体定义注意事项
- 必须设置
_pack_ = 1以匹配 EtherCAT 数据紧密排列 - 字段顺序必须与 PDO 映射顺序一致
- 结构体大小必须与 PDO 数据区大小匹配(可通过
slave.ibytes/slave.obytes查看)
字节数组访问
inputs / outputs
@property
def inputs(self) -> Optional[bytes] # 只读,输入 PDO 原始字节
@property
def outputs(self) -> Optional[bytes] # 只读,输出 PDO 原始字节
def write_outputs(self, data: bytes) -> bool # 写入输出 PDO 字节
读取 PDO 字节数组。每次访问从 IOmap 读取最新数据。写入使用 write_outputs() 方法。
示例:
# 读取输入 PDO
raw_input = slave.pdo.inputs
if raw_input:
import struct
status_word = struct.unpack_from('<H', raw_input, 0)[0]
print(f"StatusWord = 0x{status_word:04X}")
# 读取输出 PDO
raw_output = slave.pdo.outputs
if raw_output:
print(f"当前输出: {raw_output.hex()}")
# 写入输出 PDO
slave.pdo.write_outputs(bytes([0x0F, 0x00, 0x10, 0x27, 0x00, 0x00]))
直接读写
提示
以下方法适用于需要精细控制偏移和长度的场景。
read_input_data_direct() / write_output_data_direct()
def read_input_data_direct(self, buffer: bytearray, offset: int = 0, length: int = 0) -> int
def write_output_data_direct(self, data: bytes, offset: int = 0, length: int = 0) -> int
直接从 IOmap 读取输入数据 / 写入输出数据,支持指定缓冲区偏移和字节数。
参数:
buffer/data— 目标/源缓冲区offset(int) — 缓冲区偏移(默认 0)length(int) — 要读/写的字节数(0 = 全部)
返回值:
int— 实际读取/写入的字节数
示例:
# 读取输入数据到缓冲区
input_buffer = bytearray(slave.ibytes)
bytes_read = slave.pdo.read_input_data_direct(input_buffer)
# 写入输出数据
output_data = bytes([0x0F, 0x00, 0x00, 0x00])
bytes_written = slave.pdo.write_output_data_direct(output_data)
原始指针访问
注意
以下 API 直接操作内存指针,仅适用于极致性能场景。指针在 IOmap 重新映射后会失效,请确保仅在 PDO 循环期间使用。
get_input_data_pointer() / get_output_data_pointer()
def get_input_data_pointer(self) -> int
def get_output_data_pointer(self) -> int
获取输入/输出数据的直接指针(零拷贝访问)。失败时返回 0。
示例:
import ctypes
input_ptr = slave.pdo.get_input_data_pointer()
if input_ptr:
# 通过 ctypes 读取数据
status_word = ctypes.cast(input_ptr, ctypes.POINTER(ctypes.c_uint16))[0]
print(f"StatusWord = 0x{status_word:04X}")
memoryview 访问
def get_input_memoryview(self) -> memoryview # 输入 PDO 零拷贝视图
def get_output_memoryview(self) -> memoryview # 输出 PDO 零拷贝视图
获取 PDO 数据的 memoryview(零拷贝访问)。仅在 PDO 循环中使用,IOmap 重新映射后视图会失效。
示例:
mv_in = slave.pdo.get_input_memoryview()
if len(mv_in) > 0:
import struct
status_word = struct.unpack_from('<H', mv_in, 0)[0]
print(f"StatusWord = 0x{status_word:04X}")
mv_out = slave.pdo.get_output_memoryview()
if len(mv_out) > 0:
mv_out[0] = 0xFF # 直接写入
PDO 映射信息查询
@property
def inputs_mapping_info(self) -> dict # {'pointer': int, 'size': int, 'slave_index': int}
@property
def outputs_mapping_info(self) -> dict # {'pointer': int, 'size': int, 'slave_index': int}
获取输入/输出 PDO 映射信息,包含指针地址、数据大小和从站索引。
类型化读写
读取输入 PDO
SlavePdo 提供类型化读取方法,按偏移读取输入 PDO 数据:
# 按类型读取输入 PDO(offset 为字节偏移量)
status_word = slave.pdo.read_uint16(0) # 偏移 0, 无符号 16 位
actual_pos = slave.pdo.read_int32(2) # 偏移 2, 有符号 32 位
temperature = slave.pdo.read_float(6) # 偏移 6, 32 位浮点
digital_inputs = slave.pdo.read_uint8(10) # 偏移 10, 无符号 8 位
velocity = slave.pdo.read_int64(11) # 偏移 11, 有符号 64 位
name = slave.pdo.read_string(19, 16) # 偏移 19, 最大 16 字符
写入输出 PDO
类型化写入方法,按偏移写入输出 PDO 数据:
# 按类型写入输出 PDO(offset 为字节偏移量)
slave.pdo.write_uint16(0, 0x000F) # ControlWord
slave.pdo.write_int32(2, 100000) # TargetPosition
slave.pdo.write_float(6, 3.14) # 浮点参数
slave.pdo.write_uint8(10, 0xFF) # 数字输出
slave.pdo.write_int64(11, 999999) # 64 位整数
slave.pdo.write_string(19, "hello", 16) # 字符串 (含 null 填充)
支持的类型
read_uint8(offset)/write_uint8(offset, val)— 无符号 8 位 (1 字节)read_int8(offset)/write_int8(offset, val)— 有符号 8 位 (1 字节)read_uint16(offset)/write_uint16(offset, val)— 无符号 16 位 (2 字节)read_int16(offset)/write_int16(offset, val)— 有符号 16 位 (2 字节)read_uint32(offset)/write_uint32(offset, val)— 无符号 32 位 (4 字节)read_int32(offset)/write_int32(offset, val)— 有符号 32 位 (4 字节)read_uint64(offset)/write_uint64(offset, val)— 无符号 64 位 (8 字节)read_int64(offset)/write_int64(offset, val)— 有符号 64 位 (8 字节)read_float(offset)/write_float(offset, val)— 32 位浮点 (4 字节)read_double(offset)/write_double(offset, val)— 64 位双精度 (8 字节)read_string(offset, len)/write_string(offset, val, len)— 字符串 (可变长度)
批量读写
batch_read() / batch_write()
def batch_read(self, requests: list) -> list
def batch_write(self, requests: list) -> int
批量读写 PDO 数据,减少多次调用的开销。
参数:
batch_read:requests为[(offset, size), ...]列表,返回[bytes | None, ...]batch_write:requests为[(offset, data), ...]列表,返回成功写入的项数量
示例:
# 批量读取
results = slave.pdo.batch_read([
(0, 2), # 偏移 0, 读 2 字节 (StatusWord)
(2, 4), # 偏移 2, 读 4 字节 (ActualPosition)
(6, 4), # 偏移 6, 读 4 字节 (ActualVelocity)
])
for data in results:
if data:
print(data.hex())
# 批量写入
count = slave.pdo.batch_write([
(0, struct.pack('<H', 0x000F)), # ControlWord
(2, struct.pack('<i', 100000)), # TargetPosition
])
print(f"成功写入 {count} 项")
PDOArrayInstance(索引访问)
通过 PDOArrayInstance 按字节索引访问 PDO 数据,支持类型化属性。
from ethercat.slave.pdo import PDOArrayInstance
# 获取输入/输出数据指针后创建实例
in_ptr = slave.pdo.get_input_data_pointer()
out_ptr = slave.pdo.get_output_data_pointer()
if in_ptr:
pdo_in = PDOArrayInstance(in_ptr, slave.ibytes, is_input=True)
item = pdo_in[0] # 获取 PDODataItem
value = item.content # 读取字节值 (0-255)
status = item.as_uint16 # 读取 uint16
pos = pdo_in[2].as_int32 # 读取偏移 2 处的 int32
if out_ptr:
pdo_out = PDOArrayInstance(out_ptr, slave.obytes, is_input=False)
pdo_out[0].content = 0x0F # 写入字节值
pdo_out[0].as_uint16 = 0x000F # 写入 uint16
pdo_out[2].as_int32 = 100000 # 写入 int32
PDODataItem 类型化属性
content(int 0-255) — 字节值读写as_int16(int) — 有符号 16 位as_uint16(int) — 无符号 16 位as_int32(int) — 有符号 32 位as_uint32(int) — 无符号 32 位as_float(float) — 32 位浮点as_double(float) — 64 位双精度
位操作方法:
get_bit(bit_index)— 获取指定位值(0-7)set_bit(bit_index, value)— 设置指定位值(仅输出 PDO)
完整示例
import ctypes
from ethercat import DcSyncMode, EcState
class ServoInput(ctypes.Structure):
_pack_ = 1
_fields_ = [
("status_word", ctypes.c_uint16),
("actual_position", ctypes.c_int32),
("actual_velocity", ctypes.c_int32),
]
class ServoOutput(ctypes.Structure):
_pack_ = 1
_fields_ = [
("control_word", ctypes.c_uint16),
("target_position", ctypes.c_int32),
]
slave = master[1] # 1-based 索引
# ===== ctypes 结构体映射(推荐,零拷贝) =====
input_ref = slave.pdo.bind_pdo_struct(ServoInput, is_input=True)
output_ref = slave.pdo.bind_pdo_struct(ServoOutput, is_input=False)
if input_ref:
print(f"状态字: 0x{input_ref.status_word:04X}")
print(f"当前位置: {input_ref.actual_position}")
if output_ref:
output_ref.control_word = 0x000F
output_ref.target_position = 10000
# ===== 类型化读写 =====
status = slave.pdo.read_uint16(0)
position = slave.pdo.read_int32(2)
slave.pdo.write_uint16(0, 0x000F)
slave.pdo.write_int32(2, 100000)
# ===== 字节数组 =====
raw_input = slave.pdo.inputs
slave.pdo.write_outputs(bytes([0x0F, 0x00, 0x10, 0x27, 0x00, 0x00]))
# ===== memoryview 零拷贝 =====
mv = slave.pdo.get_input_memoryview()
if len(mv) >= 2:
import struct
sw = struct.unpack_from('<H', mv, 0)[0]
print(f"StatusWord (memoryview) = 0x{sw:04X}")
# ===== 直接读写 =====
buf = bytearray(slave.ibytes)
slave.pdo.read_input_data_direct(buf)
# ===== 批量操作 =====
results = slave.pdo.batch_read([(0, 2), (2, 4)])