Skip to content

Python — 消息类型

消息元数据类和构造函数分布在 nnrp.core.messages.controlnnrp.core.messages.data 中,通过 nnrp.core 统一再导出。

导入

python
from nnrp.core import (
    # 控制消息
    ClientHelloMetadata, ServerHelloAckMetadata,
    SessionPatchMetadata, SessionPatchAckMetadata,
    FlowUpdateMetadata, ResultHintMetadata,
    TransportProbeMetadata, TransportProbeAckMetadata,
    SessionMigrateMetadata, SessionMigrateAckMetadata,
    CachePutMetadata, CacheAckMetadata, CacheInvalidateMetadata,
    ErrorMetadata,
    # 控制扩展
    ControlExtensionEntry,
    ClientHelloTransportPolicyExtension, ServerHelloAckTransportPolicyExtension,
    ClientHelloLossToleranceExtension, ServerHelloAckLossToleranceExtension,
    ClientHelloPayloadCapabilitiesExtension, ServerHelloAckPayloadCapabilitiesExtension,
    # 数据消息
    FrameSubmitMetadata, ResultPushMetadata,
    # 构造函数
    build_frame_submit_packet, build_result_push_packet,
    build_result_drop_packet, build_frame_cancel_packet,
    build_ping_packet, build_pong_packet,
    build_flow_update_packet, build_result_hint_packet,
    build_result_push_mixed_packet, build_result_push_typed_payload_packet,
    # 解包工具
    unpack_tensor_body, unpack_tile_index_block,
    unpack_control_extension_block,
)

控制消息元数据

所有控制消息元数据类均实现 .pack() -> bytes@classmethod .unpack(data: bytes) -> Self 方法。

ClientHelloMetadata

客户端握手消息元数据。

字段类型说明
session_idint客户端分配的会话 ID
max_viewsint最大并发视角数
enable_cachebool是否启用服务端缓存
payload_kind_maskPayloadKind客户端支持的载荷类型掩码

ServerHelloAckMetadata

服务端握手应答元数据。

字段类型说明
session_idint服务端确认的会话 ID
transport_idTransportId选定的传输类型
enable_cachebool服务端是否启用了缓存
max_cache_entriesint最大缓存条目数
max_cache_bytesint最大缓存字节数
payload_kind_maskPayloadKind服务端支持的载荷类型掩码

SessionPatchMetadata

会话动态补丁请求元数据。

字段类型说明
patch_fieldsSessionPatchField要更新的字段位掩码
target_cadenceint目标帧率
quality_tierint质量档位
active_lane_maskint活跃视角掩码
preferred_codecint首选编解码器 ID
preferred_compressionint首选压缩级别

SessionPatchAckMetadata

会话补丁应答元数据。

字段类型说明
patch_fieldsSessionPatchField已成功应用的字段
statusSessionPatchAckStatus应用状态
reject_reasonSessionPatchRejectReason拒绝原因(仅拒绝时有效)

ErrorMetadata

协议错误消息元数据。

字段类型说明
error_codeErrorCode错误码
error_scopeErrorScope错误作用域
session_idint相关会话 ID
frame_idint相关帧 ID

FlowUpdateMetadata

流控窗口更新元数据。

字段类型说明
scope_kindFlowUpdateScopeKind更新作用域
update_reasonFlowUpdateReason更新原因
backpressure_levelFlowUpdateBackpressureLevel背压级别
connection_creditint连接级配额增量
session_creditint会话级配额增量
operation_creditint操作级配额增量
operation_idint目标操作 ID
retry_after_msint建议重试延迟(毫秒)
credit_epochint配额纪元(防乱序)
flagsFlowUpdateFlags标志位

ResultHintMetadata

服务端结果提示元数据(背压/队列状态预告)。

字段类型说明
budget_policyResultHintBudgetPolicy建议的预算策略
congestion_stateResultHintCongestionState当前拥塞状态
reasonResultHintReason提示原因
frame_idint关联帧 ID
queue_depthint当前队列深度
estimated_wait_msint估计等待时间(毫秒)

TransportProbeMetadata / TransportProbeAckMetadata

传输路径探测及应答元数据。

字段类型说明
probe_idint探测 ID
transport_idTransportId探测的传输类型
send_timestamp_usint发送时间戳(微秒)

SessionMigrateMetadata / SessionMigrateAckMetadata

会话迁移通知及确认元数据。

字段类型说明
session_idint迁移会话 ID
target_session_idint目标会话 ID
migration_tokenbytes迁移令牌

缓存消息元数据

CachePutMetadata

字段类型说明
object_kindCacheObjectKind对象类型
object_keyint对象键(uint32)
namespace_idint命名空间 ID
flagsCachePutFlags存储标志
body_lenint对象字节数

CacheAckMetadata

字段类型说明
object_kindCacheObjectKind对象类型
object_keyint对象键
statusCacheAckStatus存储结果

CacheInvalidateMetadata

字段类型说明
scopeCacheInvalidateScope失效范围
object_kindCacheObjectKind对象类型(OBJECT_KIND 范围时有效)
namespace_idint命名空间 ID
object_keyint对象键(OBJECT_KEY 范围时有效)

数据消息元数据

FrameSubmitMetadata

帧提交消息元数据。

字段类型说明
input_profileInputProfile输入数据格式
submit_modeSubmitMode提交模式(内联/引用/混合)
budget_policyBudgetPolicy允许的预算降质策略
payload_kind_bitmapPayloadKind载荷类型位掩码
tile_countint瓦片总数
section_countintTensor 分区数
inference_budget_msint最大推理预算(毫秒)
deadline_msint绝对截止时间(毫秒)

ResultPushMetadata

结果推送消息元数据。

字段类型说明
result_classResultClass结果完整性分类
result_flagsResultFlags结果标志
applied_budget_policyBudgetPolicy实际应用的降质策略
active_profile_idint推理时使用的 Profile ID
inference_msint推理耗时(毫秒)
queue_msint排队耗时(毫秒)
server_total_msint服务端总耗时(毫秒)
status_codeint状态码
tile_index_modeTileIndexMode瓦片索引编码格式
tile_base_idint瓦片索引基偏移
covered_tile_countint已覆盖的瓦片数
dropped_tile_countint已丢弃的瓦片数
reused_frame_idint重用的帧 ID(ResultClass.STALE_REUSE 时有效)
payload_kind_bitmapPayloadKind实际载荷类型位掩码
payload_frame_countint非 Tensor 载荷帧数

控制扩展类型

扩展块附加在 CLIENT_HELLO / SERVER_HELLO_ACK 元数据之后,由 ControlExtensionEntry 描述。

python
@dataclass(frozen=True, slots=True)
class ControlExtensionEntry:
    type_id: int        # 扩展类型 ID(见枚举常量)
    flags: ControlExtensionFlags
    payload: bytes

预定义扩展类型

方向说明
ClientHelloTransportPolicyExtensionC→S客户端声明传输策略偏好
ServerHelloAckTransportPolicyExtensionS→C服务端确认传输策略
ClientHelloLossToleranceExtensionC→S客户端声明丢包容忍度
ServerHelloAckLossToleranceExtensionS→C服务端确认丢包容忍度
ClientHelloPayloadCapabilitiesExtensionC→S客户端声明支持的载荷类型
ServerHelloAckPayloadCapabilitiesExtensionS→C服务端确认载荷能力

包构造函数

高层构造函数,返回完整的 NnrpPacket

python
def build_frame_submit_packet(
    session_id: int,
    frame_id: int,
    *,
    tile_ids: tuple[int, ...],
    sections: tuple[TensorSectionData, ...],
    input_profile: InputProfile,
    submit_mode: SubmitMode,
    budget_policy: BudgetPolicy,
    payload_kind_bitmap: PayloadKind,
    inference_budget_ms: int,
    deadline_ms: int,
    flags: HeaderFlags = HeaderFlags.NONE,
    view_id: int = 0,
    route_id: int = 0,
    trace_id: int = 0,
) -> NnrpPacket: ...

def build_result_push_packet(
    session_id: int,
    frame_id: int,
    *,
    tile_ids: tuple[int, ...],
    sections: tuple[TensorSectionData, ...],
    result_flags: ResultFlags,
    active_profile_id: int,
    inference_ms: int,
    queue_ms: int,
    server_total_ms: int,
    status_code: int,
    tile_index_mode: TileIndexMode,
    tile_base_id: int,
    result_class: ResultClass,
    applied_budget_policy: BudgetPolicy,
    reused_frame_id: int,
    covered_tile_count: int,
    dropped_tile_count: int,
    payload_kind_bitmap: PayloadKind,
    payload_frame_count: int,
    flags: HeaderFlags = HeaderFlags.NONE,
    view_id: int = 0,
    route_id: int = 0,
    trace_id: int = 0,
) -> NnrpPacket: ...

def build_result_push_mixed_packet(...) -> NnrpPacket: ...
def build_result_push_typed_payload_packet(...) -> NnrpPacket: ...
def build_result_drop_packet(session_id: int, frame_id: int, *, ...) -> NnrpPacket: ...
def build_frame_cancel_packet(session_id: int, frame_id: int, *, ...) -> NnrpPacket: ...
def build_ping_packet(session_id: int, *, frame_id: int = 0, ...) -> NnrpPacket: ...
def build_pong_packet(session_id: int, *, frame_id: int = 0, ...) -> NnrpPacket: ...
def build_flow_update_packet(session_id: int, *, metadata: FlowUpdateMetadata, ...) -> NnrpPacket: ...
def build_result_hint_packet(session_id: int, *, metadata: ResultHintMetadata, ...) -> NnrpPacket: ...

解包工具函数

python
def unpack_tensor_body(body: bytes, section_count: int) -> TensorBodyView:
    """从包体字节流解析 Tensor 分区数据视图。"""

def unpack_tile_index_block(body: bytes, mode: TileIndexMode) -> tuple[int, ...]:
    """解析瓦片索引块,返回瓦片 ID 元组。"""

def unpack_control_extension_block(payload: bytes) -> tuple[ControlExtensionEntry, ...]:
    """解析控制扩展块,返回所有扩展条目。"""

def unpack_inline_object_blocks(body: bytes) -> ...:
    """解析内联对象块。"""

def unpack_object_reference_blocks(body: bytes) -> tuple[ObjectReferenceBlock, ...]:
    """解析对象引用块。"""

def unpack_typed_payload_frames(body: bytes) -> ...:
    """解析非 Tensor 类型载荷帧列表。"""

def unpack_body(packet: NnrpPacket) -> ...:
    """通用包体解析入口(自动按消息类型分发)。"""

def validate_frame_submit_body(packet: NnrpPacket, metadata: FrameSubmitMetadata) -> None:
    """验证 FRAME_SUBMIT 包体完整性,不合法则抛出 ValueError。"""

典型使用场景

场景一:构造并发送帧提交包(低层方式)

通常应使用 ClientSession.submit_frame(),但在自定义传输适配器或压测工具中,可直接使用构造函数。

python
from nnrp.core import (
    build_frame_submit_packet,
    FrameSubmitMetadata,
    TensorSectionData,
)
from nnrp import SubmitMode, BudgetPolicy, InputProfile, TileIndexMode

metadata = FrameSubmitMetadata(
    frame_id=100,
    input_profile=InputProfile.CHANGED_TILES_LUMA,
    submit_mode=SubmitMode.INLINE,
    budget_policy=BudgetPolicy.ALLOW_PARTIAL,
    inference_budget_ms=8,
    tile_ids=(3, 7, 12),
    tile_index_mode=TileIndexMode.RAW_U16,
)
section = TensorSectionData(
    role_id=0,
    dtype_id=0,  # FP16
    tile_payloads=(tile_bytes_3, tile_bytes_7, tile_bytes_12),
)
packet = build_frame_submit_packet(
    session_id=42,
    metadata=metadata,
    sections=(section,),
)
raw = packet.pack()
await transport.send(raw)

场景二:解析握手扩展

自定义握手扩展通过 ControlExtensionEntry 携带,需按扩展类型 ID 分发解析。

python
from nnrp.core import (
    unpack_control_extension_block,
    ClientHelloTransportPolicyExtension,
)
from nnrp.enums import CLIENT_HELLO_TRANSPORT_POLICY_EXTENSION

extensions = unpack_control_extension_block(packet.metadata)
for entry in extensions:
    if entry.ext_type == CLIENT_HELLO_TRANSPORT_POLICY_EXTENSION:
        policy_ext = ClientHelloTransportPolicyExtension.unpack(entry.payload)
        print("Requested transport policy:", policy_ext.transport_policy)

场景三:结果包解析与 Tensor 重组

python
from nnrp.core import unpack_body, unpack_tensor_body
from nnrp.core.enums import MessageType

packet = await connection.receive_packet()
assert packet.header.msg_type is MessageType.RESULT_PUSH

body = unpack_body(packet)
# body.metadata: ResultPushMetadata
# body.tensor_body: TensorBodyView | None

if body.tensor_body:
    for section_idx, section in enumerate(body.tensor_body.sections):
        for tile_idx, tile_bytes in enumerate(section.tile_payloads):
            reconstruct_tile(section_idx, tile_idx, tile_bytes)

常见坑点

WARNING

  1. 元数据(metadata)vs 包体(bodyNnrpPacket.metadata 是小型结构化字段(帧 ID、预算、策略等),NnrpPacket.body 是大块二进制载荷(Tensor 数据)。两者物理上分开存储;若误用 unpack_tensor_body(packet.metadata, ...) 而非 packet.body,会得到解析错误或截断数据。

  2. FrameSubmitMetadata.tile_idsTensorSectionData.tile_payloads 的顺序必须一致tile_ids[i] 对应 tile_payloads[i],乱序会导致服务端用错误的位置索引拼合结果瓦片,渲染出现乱序伪影。

  3. validate_frame_submit_body 只做结构性校验,不验证张量数值合法性;调用它不代表数据一定能被推理引擎正确处理。

  4. build_result_push_mixed_packetbuild_result_push_typed_payload_packet 适用不同场景:前者同时包含 Tensor 和非 Tensor 载荷,后者只含非 Tensor 载荷(如 Token 流);混用会导致接收端解包出现空分区。

NNRP Documentation