Sven

Protocol Buffers


proto3

Proto3是谷歌发布的一种数据序列化格式,用于在不同系统之间传输和存储结构化数据。

它是Protocol Buffers的第三个版本,比起proto2有很多改进。

特点

  1. 更简洁易读的语法。proto3语法更加简洁清晰,去掉了一些不常用的语法,使得用户更容易上手。
  2. 更强的向后兼容性。proto3遵循“向前兼容,向后不兼容”的原则,可以更好地应对数据结构和协议的变化,保证了数据的完整性和稳定性。
  3. 更好的API设计。proto3的API更加友好和直观,让用户更容易使用。
  4. 更好的标准化。proto3采用了更加严格的标准,使得不同厂商之间的数据交换更加顺畅。

应用场景

Proto3广泛应用于分布式系统中的数据交换和存储,可以用于网络通信、存储数据、RPC调用等。

它可以提高系统的性能和稳定性,减少数据传输时的带宽消耗和存储空间占用。


.proto 文件语法

.proto文件是定义数据结构的文件,用于生成应用程序代码。

使用.proto文件可以方便地定义消息类型、服务、RPC方法等。

基本语法结构

基础语法
/*
* 指定 Protocol Buffer 的版本语法
* 指定要使用的 Protocol Buffer 版本
* 通常可以是 proto3 或 proto2
*/
syntax = "proto3";

/*
* 在 Protocol Buffer 中声明消息:
* 如你所见,每个字段在消息定义中都有一个唯一的编号。
* 这些字段编号用于在消息的二进制格式中标识你的字段,
* 一旦你的消息类型被使用,就不应该更改这些字段编号。
* 注意,编号范围 1 到 15 的字段仅占用一个字节进行编码(包含字段编号和字段类型)。
* 编号范围 16 到 2047 的字段则占用两个字节。因此,应为频繁出现的字段保留 1 到 15 的编号。
* 同时要留出一些编号以备将来添加新的频繁字段。
* 可以指定的最小字段编号是 1,最大是 2^29 - 1,即 536,870,911。
* 你也不能使用 19000 到 19999 的编号范围(FieldDescriptor::kFirstReservedNumber 到 FieldDescriptor::kLastReservedNumber),
* 因为它们是保留给 Protocol Buffers 实现的 —— 如果你使用这些保留编号,协议缓冲编译器会报错。
* 同样,不能使用之前已经被保留的字段编号。
*/

/*
声明消息的语法:
    message ${MessageName} {
        ${Scalar Value Type} ${FieldName1} = ${Tag Number1};
                .
                .
                .
        ${Scalar Value Type} ${FieldNameN} = ${Tag NumberN};
    }

默认值:如果消息中没有包含某个字段,则会使用默认值。
*/

message MessageTypes {
    /*
    * 标量值类型(Scalar Value Types)
    */
    string stringType = 1; // 字符串,必须是 UTF-8 或 7 位 ASCII 文本。默认值 = ""

    // 数字类型,默认值 = 0
    int32 int32Type = 2; // 使用变长编码,对负数效率较低,推荐使用 sint32。
    int64 int64Type = 3; // 使用变长编码,对负数效率较低,推荐使用 sint64。
    uint32 uInt32Type = 4; // 使用变长编码
    uint64 uInt64Type = 5; // 使用变长编码
    sint32 sInt32Type = 6; // 使用变长编码,适合负数编码,推荐替代 int32
    sint64 sInt64Type = 7; // 使用变长编码,适合负数编码,推荐替代 int64

    fixed32 fixed32Type = 8; // 总是占用 4 字节。对于大于 2^28 的值比 uint32 更高效。
    fixed64 fixed64Type = 9; // 总是占用 8 字节。对于大于 2^56 的值比 uint64 更高效。

    sfixed32 sfixed32Type = 10; // 总是占用 4 字节。
    sfixed64 sfixed64Type = 11; // 总是占用 8 字节。

    bool boolType = 12; // 布尔类型。默认值 = false

    bytes bytesType = 13; // 可以包含任意字节序列。默认值 = 空字节

    double doubleType = 14;
    float floatType = 15;

    enum Week {
        UNDEFINED = 0; // 枚举的默认值为 0
        SUNDAY = 1;
        MONDAY = 2;
        TUESDAY = 3;
        WEDNESDAY = 4;
        THURSDAY = 5;
        FRIDAY = 6;
        SATURDAY = 7;
    }
    Week wkDayType = 16;

    /*
    * 定义标量类型的集合
    * 语法:repeated ${ScalarType} ${name} = TagValue
    */
    repeated string listOfString = 17; // 字符串列表(List[String])
}

/*
* 在另一个消息定义中使用已定义的消息类型
*/
message Person {
    string fname = 1;
    string sname = 2;
}

message City {
    Person p = 1;
}

/*
* 嵌套消息定义
*/

message NestedMessages {
    message FirstLevelNestedMessage {
        string firstString = 1;
        message SecondLevelNestedMessage {
            string secondString = 2;
        }
    }
    FirstLevelNestedMessage msg = 1;
    FirstLevelNestedMessage.SecondLevelNestedMessage msg2 = 2;
}

/*
* 从文件导入消息定义
*/

// one.proto
// message One {
//     string oneMsg = 1;
// }

// two.proto
//  import "myproject/one.proto"
//  message Two {
//       string twoMsg = 2;
//  }


/*
* 高级主题
*/

/*
* 处理消息类型变更:
* 永远不要修改/重用已被删除字段的 Tag 编号
* 如果要添加/删除字段,应使用 reserved 关键字
* (https://developers.google.com/protocol-buffers/docs/proto3#updating)
*/

/*
* 保留字段
* 如果你需要添加或删除消息中的字段时,为了兼容性考虑,应使用保留字段。
* 使用 reserved 关键字可以实现向前/向后兼容
*/

message ReservedMessage {
    reserved 0, 1, 2, 3 to 10; // 一组不能被重用的编号
    reserved "firstMsg", "secondMsg", "thirdMsg"; // 一组不能被重用的字段名称
}

/*
* Any 类型
* Any 类型允许你在不引入具体 .proto 定义的情况下嵌入任意消息类型
* Any 内部包含任意消息的序列化字节,
* 同时包含一个 URL,作为该消息类型的唯一标识符,并能解析该类型
* 若要使用 Any,需先导入如下:
*/
/*
    import "google/protobuf/any.proto";
    message AnySampleMessage {
        repeated google.protobuf.Any.details = 1;
    }
*/


/*
* OneOf
* 某些场景下,消息中最多只包含一个字段。
* 注意:OneOf 字段不能使用 repeated
*/

message OneOfMessage {
    oneof msg {
        string fname = 1;
        string sname = 2;
    };
}

/*
* Map 映射类型
* Map 字段不能是 repeated。
* Map 的顺序是不保证的。
*/

message MessageWithMaps {
    map<string, string> mapOfMessages = 1;
}


/*
* Package(包名)
* 用于防止不同协议消息类型间的命名冲突
* 语法:
    package ${packageName};

    调用方式:
    ${packageName}.${messageName} = ${tagNumber};
*/

/*
* 服务(Services)
* 用于 RPC 系统的消息类型定义。
* protoc 编译器在生成各语言代码时会为服务生成存根方法。
*/

message SearchRequest {
    string queryString = 1;
}

message SearchResponse {
    string queryResponse = 1;
}
service SearchService {
    rpc Search (SearchRequest) returns (SearchResponse);
}

/*
* protoc 编译命令:
*/
protoc --proto_path=IMPORT_PATH --cpp_out=DST_DIR --java_out=DST_DIR --python_out=DST_DIR --go_out=DST_DIR --ruby_out=DST_DIR --objc_out=DST_DIR --csharp_out=DST_DIR path/to/file.proto

注意事项

message 的兼容

message语句定义了消息类型,格式为:

message MessageName {
    field_type field_name = field_number;
}

其中,MessageName为消息名称,field_type为字段类型,field_name为字段名,field_number为字段编号, 字段编号一旦被使用就不应该更改, 因为这些字段编号在二进制格式中标识了字段。

如果要废弃某个字段, 应该使用reserved语法保持向前的兼容, 注意不能在一个reserved语句中混合使用字段名和字段编号:

message Foo {
    reserved 2, 15, 9 to 11, 40 to max;
    reserved "foo", "bar";
}

enum的默认值和别名

enum语句定义了枚举类型,格式为:

enum EnumName {
    ENUM_VALUE = 0;
}

其中,EnumName为枚举类型名称,ENUM_VALUE为枚举值,0为枚举值的编号。

枚举的第一个常量映射到零:每个枚举定义都必须包含一个映射到零的常量作为其第一个元素。这是因为:

  • 必须有一个零值,这样我们就可以使用 0 作为数字默认值
  • 零值需要是第一个元素,以便与 第一个枚举值始终是默认值的proto2语义兼容。

可以通过将相同的值分配给不同的枚举常量来定义别名。为此,需要将allow_alias选项设置为true,否则协议编译器会在发现别名时生成一条警告消息。 尽管在反序列化期间所有别名值都有效,但在序列化时始终使用第一个值。

enum EnumAllowingAlias {
    option allow_alias = true;
    EAA_UNSPECIFIED = 0;
    EAA_STARTED = 1;
    EAA_RUNNING = 1;
    EAA_FINISHED = 2;
}

enum EnumNotAllowingAlias {
    ENAA_UNSPECIFIED = 0;
    ENAA_STARTED = 1;
    // ENAA_RUNNING = 1;  // Uncommenting this line will cause a warning message.
    ENAA_FINISHED = 2;
}

service的使用

service语句定义了服务类型,格式为:

service ServiceName {
    rpc MethodName(RequestType) returns (ResponseType);
}

其中,ServiceName为服务类型名称,MethodName为RPC方法名称,RequestType为请求消息类型,ResponseType为响应消息类型。

比如一个车辆相关的demo:

RidingBike.proto
syntax = "proto3";
import "protocBuf/include/google/protobuf/empty.proto";
import "proto/HistoryParam.proto";
package com.test.OuterClass;
option java_generic_services = true;

/**
* 骑行中车辆信息集合
*/
message RidingBike{
    string machineId     = 2;  // 设备id
    string userCode      = 3;  // 设备userCode
    int32 soc            = 4;  // 电量百分比
    int32 times          = 5;  // 今日骑行次数
    int32 minutes        = 6;  // 今日骑行分钟数
    string lonC          = 7;  // 经纬度
    string latC          = 8;
    bool machineFault    = 11; // 车辆是否故障
    // 骑行中状态专用
    string startRideTime = 10; // 开始骑行时间
    string phone = 12;         // 骑行中的人手机号
    ///~
}
// ------------------------------ 实时信息 ---------------------------------------
/**
* 骑行中车辆信息集合
*/
message RidingBikeList{
    repeated RidingBike data = 1;
}

/**
* 实时的骑行中车辆信息集合接口
*/
service RidingBikeRealTime{
    rpc service(google.protobuf.Empty) returns(RidingBikeList);
}
// --------------------------- 时间线回溯功能 ------------------------------------------
/**
* 骑行中车辆信息集合的历史记录
*/
message RidingBikeWithTime{
    RidingBikeList data = 1;
    string dateTime = 2;
}

message RidingBikeWithTimeList{
    repeated RidingBikeWithTime data = 1;
}

/**
* 骑行中车辆信息集合的历史记录接口
*/
service RidingHistory{
    rpc service(HistoryParam) returns(RidingBikeWithTimeList);
}

编译器编译后得到:

RidingBikeOuterClass.java中的一段
/**
 * <pre>
 **
 * 实时的骑行中车辆信息集合接口
 * </pre>
 *
 * Protobuf service {@code com.test.OuterClass.RidingBikeRealTime}
 */
public static abstract class RidingBikeRealTime
        implements com.google.protobuf.Service {

    public interface Interface {
        /**
         * <code>rpc service(.google.protobuf.Empty) returns (.com.test.OuterClass.RidingBikeList);</code>
         */
        public abstract void service(
                com.google.protobuf.RpcController controller,
                com.google.protobuf.Empty request,
                com.google.protobuf.RpcCallback<com.test.OuterClass.RidingBikeOuterClass.RidingBikeList> done);

    }
}

在java中实现这个service rpc接口:

package com.test.service;

import com.google.protobuf.Empty;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.test.proto.RidingBikeOuterClass.RidingBike;
import com.test.proto.RidingBikeOuterClass.RidingBikeList;
import com.test.proto.RidingBikeOuterClass.RidingBikeRealTime;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

/**
 * 示例实现:用于演示如何实现 RidingBikeRealTime RPC 接口。
 * 实现逻辑:
 * 1. 实现 RPC 接口中的 service 方法。
 * 2. 构造响应数据 RidingBikeList 并通过回调返回。
 */
@Component
public class RidingRealTimeImpl extends RidingBikeRealTime {
    @Override
    public void service(RpcController controller, Empty request, RpcCallback<RidingBikeList> done) {
        // 构造示例数据列表
        List<RidingBike> bikeData = new ArrayList<>();

        // 构建一个示例 RidingBike 对象
        RidingBike bike = RidingBike.newBuilder()
                .setMachineId("123")
                .setUserCode("user001")
                .setSoc(80)
                .setTimes(2)
                .setMinutes(30)
                .setLonC("120.123456")
                .setLatC("30.123456")
                .setMachineFault(false)
                .setStartRideTime("2025-01-01 10:00:00")
                .setPhone("1234567890")
                .build();

        // 添加到列表
        bikeData.add(bike);

        // 构建响应对象
        RidingBikeList.Builder listBuilder = RidingBikeList.newBuilder();
        listBuilder.addAllData(bikeData);

        // 返回响应
        done.run(listBuilder.build());
    }
}

整合websocket

package com.test.websocket;

import com.test.protobuf.RequestOuterClass.Request;
import com.test.service.RidingRealTimeImpl;
import com.test.utils.WSSessionUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.BinaryWebSocketHandler;

import javax.annotation.Resource;
import java.nio.ByteBuffer;

@Component
@Slf4j
public class RealTimeHandler extends BinaryWebSocketHandler {

    @Resource
    private RidingRealTimeImpl ridingRealTime;

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        super.afterConnectionEstablished(session);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        super.afterConnectionClosed(session, status);
    }

    @Override
    protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
        ByteBuffer payload = message.getPayload();
        Request request = Request.parseFrom(payload); // 反序列化二进制数据

        switch (request.getService()) {
            case Ser_RIDING_BIKE:
                log.info("riding bike");
                ridingRealTime.service(null, null, result -> {
                    WSSessionUtils.SUCCESS(session, result); // 推给客户端
                });
                return;

            case UNRECOGNIZED:
                log.info("unrecognized service");
                WSSessionUtils.CLIENT_ERROR(session);
                break;

            default:
                log.info("unsupported service: {}", request.getService());
                WSSessionUtils.CLIENT_ERROR(session);
        }
    }
}

package com.test.config;

import com.test.Interceptor.HandlerInterceptor;
import com.test.websockets.ChatHandler;
import com.test.websockets.HistoryHandler;
import com.test.websockets.RealTimeHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WSConfig implements WebSocketConfigurer {
    private final HandlerInterceptor tokenInterceptor;
    private final RealTimeHandler realTimeHandler;
    private final HistoryHandler historyHandler;

    public WSConfig(HandlerInterceptor tokenInterceptor,
                    RealTimeHandler realTimeHandler,
                    HistoryHandler historyHandler) {
        this.tokenInterceptor = tokenInterceptor;
        this.realTimeHandler = realTimeHandler;
        this.historyHandler = historyHandler;
    }

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(realTimeHandler, "/realTime")
                .addInterceptors(tokenInterceptor)
                .setAllowedOrigins("*");

        registry.addHandler(historyHandler, "/history")
                .addInterceptors(tokenInterceptor)
                .setAllowedOrigins("*");
    }
}

proto 后端工程实践

.proto文件是定义数据结构的文件,使用.proto文件可以方便地定义消息类型、服务、RPC方法等。它具有简洁明了、易于扩展、跨语言支持等特点,是一种非常优秀的数据交换格式。

对于Java,编译器生成一个.java文件,其中包含每种消息类型的类,以及Builder用于创建消息类实例的特殊类。

maven模块文件结构

参考: https://github.com/protocolbuffers/protobuf/tree/main/java

readme.txt
build.ps1(核心编译指令: .\protocBuf\bin\protoc.exe --java_out=..\..\ .\proto\*.proto)
build.sh
pom.xml(引入protobuf-java, protobuf-java-util)

核心proto

syntax = "proto3";
import "proto/HistoryParam.proto";
package com.test.OuterClass;
option java_generic_services = true;

enum Service{
    Ser_WEATHER                       = 0;    // 天气
    Ser_UN_USED                       = 1;    // 闲置
    Ser_RIDING_BIKE                   = 2;    // 骑行中
    Ser_PROHIBIT_AREA                 = 3;    // 禁停区
    Ser_PARK_POINT_WITH_WEIGHT        = 4;    // 停车点(带权/有三天借还车数据)
    Ser_PARK_POINT                    = 5;    // 停车点基本信息, 未实现
    Ser_NO_POWER                      = 6;    // 馈电
    Ser_MAINTENANCE_WORKER_TRACK      = 7;    // 运维工作轨迹
    Ser_MAINTENANCE_WORKER_STATISTICS = 8;    // 运维工作统计
    Ser_MAINTENANCE_WORKER            = 9;    // 运维基本信息
    Ser_AB_NORMAL_STATISTICS          = 10;   // 异常统计信息
    Ser_AB_NORMAL_ALL_FAULT           = 11;   // 所有异常信息
}


message Request{
    Service service     = 1;  // 调用某个具体服务
    HistoryParam params = 2;  // 传了就是history, 不传就是realtime
}

protobuf.js 实践

环境配置

protobuf在它的readme中提到了一个protobuf-javascript的实现, 但是我测试的时候它报了 3221225781 相关的异常, 根据grpc-web/issues/697的提示操作不起作用后放弃了官方推荐的实现

根据npm的数据, protobuf.js可能是一个很好的实现;

  1. npm install protobufjs

    注意: 不是protocbuf.js

  2. npm install protobufjs-cli

    提供了编译工具, 可以根据proto定义文件生成a.js, a.t.js等

  3. 在npm脚本中添加protoc, 用于编译.proto文件为es6语法, 生成d.ts文件

    "scripts": {
        "start": "react-scripts start",
        "build": "react-scripts build",
        "test": "react-scripts test",
        "eject": "react-scripts eject",
        "protoc": "pbjs -t static-module -w es6 -o src/proto/proto.js src/proto/*.proto && pbjs -t static-module src/proto/*.proto | pbts -o src/proto/proto.d.ts -"
    }
  4. 一个demo

syntax = "proto3";
message TestInt {
    int32 aInt = 1;
}

分析demo

demo中的{aInt: 152} 会被序列化为 08 98 01

第一个字节08的二进制0000 1000为: field tag(前5位) 和 type(后3位)

故: tag = 00001, type = 000

由于aInt是数字类型: 后面的字节标识具体的值, 字符串类型还会有一个字节标识字符串长度

98 01 = 1001 1000, 0000 0001

每个字节的第一位(MSB, most significant bit)标识后面有没有了, 后7位才是具体的数据

数据= 001 1000, 000 0001 ≠ 152

看起来不对啊, 因为用的是小端顺序, 转换为大端顺序即可

数据其实= 000 0001, 001 1000 === 152


注意

  • 注意proto3 默认全部都是optional的(包括oneOf本身也是), 没有required和optional之分了;
  • oneOf 会带一个typeOf字段, 可以用typeOf字符串找到解码器;

On this page