前言

brpc 是用 c++语言编写的工业级 RPC 框架,常用于搜索、存储、机器学习、广告、推荐等高性能系统

RPC(Remote Procedure Call,远程过程调用)框架指用于在网络中实现进程间通信的技术,使得程序能够调用远程计算机上的程序或服务,就像调用本地程序一样。


特性

特性 描述
高性能 针对高并发场景优化,支持异步 IO,低延迟和高吞吐量。
多种传输协议 支持 HTTP/2、gRPC、TCP 等多种协议,灵活选择适合的方案。
流式支持 支持单向和双向流式 RPC 调用,适合实时数据传输场景。
负载均衡 内置多种负载均衡策略,支持客户端和服务端负载均衡。
服务发现 提供内置服务发现机制,支持与外部工具(如 Etcd、Consul)集成。
灵活序列化方式 默认使用 Protobuf,还支持 JSON 等其他序列化格式。
易用性 提供简单易懂的 API,快速上手和实现服务。
扩展性 支持插件机制,可以根据需求扩展功能。

使用场景

  1. 微服务架构:作为微服务间通信的基础框架,支持快速构建和部署微服务应用。
  2. 高并发处理:适用于需要处理大量并发请求的应用,如在线支付、即时通讯等。
  3. 实时数据传输:支持实时数据流的场景,例如视频直播、在线游戏等。
  4. 大规模分布式系统:适合于构建大规模的分布式系统,支持动态扩展和负载均衡。
  5. 跨语言服务调用:支持多种编程语言的服务调用。

brpc && grpc 对比

下面是 gRPC 与 brpc 的对比表格,涵盖了 主要特性和优缺点:

特性 gRPC brpc
语言支持 多种语言(C++, Java, Python, Go等) 多种语言(C++, Python, Java等)
传输协议 HTTP/2 自定义协议,支持多种传输方式
序列化方式 Protobuf Protobuf、JSON等
性能 高性能,适合微服务架构 较高性能,优化了并发处理
流式支持 支持单向和双向流式 支持双向流式
负载均衡 需要外部支持或使用 gRPC 的内置功能 内置支持多种负载均衡策略
服务发现 依赖外部服务发现(如 Consul) 内置服务发现机制
生态系统 强大的生态系统,广泛应用 相对较小但在特定场景下表现出色
社区活跃度 活跃,广泛使用 较小,但有特定用户群体
文档与支持 丰富的文档和社区支持 文档相对较少,社区较小

总结

  • gRPC 适合需要跨语言和高性能通信的微服务架构,具有强大的生态系统和社区支持。
  • brpc 更加专注于高性能的 RPC 通信,并且在某些场景下具有更好的灵活性和效率,适合特定需求的使用者。

相关类与接口

日志输出类与接口

日志输出类 包含头文件: #include <butil/logging.h>
在编写项目时,日志输出完全根据个人以及项目需求以使用不同的日志输出类,这里介绍如何关闭brpc自带的日志输出类:

namespace logging {
    // 日志输出目标枚举
    enum LoggingDestination {
        LOG_TO_NONE = 0  // 不输出日志
    };

    // 日志设置结构体
    struct BUTIL_EXPORT LoggingSettings {
        // 构造函数,初始化日志设置
        LoggingSettings();

        // 日志输出目标,决定日志将被发送到何处
        LoggingDestination logging_dest;
    };

    // 初始化日志系统
    // 参数:
    //   settings - 包含日志设置的 LoggingSettings 对象
    // 返回:
    //   bool - 初始化是否成功
    bool InitLogging(const LoggingSettings& settings);
}

// 0. 关闭 brpc 默认日志输出
logging::LoggingSettings settings;  // 创建一个 LoggingSettings 对象
settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;  // 设置日志输出为不输出
logging::InitLogging(settings);  // 初始化日志系统,应用上述设置

protobuf类与接口

namespace google
{
    namespace protobuf
    {
        // Closure 类用于定义可调用对象的接口
        class PROTOBUF_EXPORT Closure
        {
        public:
            // 默认构造函数
            Closure() {}

            // 虚析构函数,用于确保派生类被正确析构
            virtual ~Closure();

            // 纯虚函数,派生类需要实现该函数以定义具体的操作
            virtual void Run() = 0;
        };

        // 创建一个新的回调对象
        // 参数:
        //   function - 指向要调用的无参数函数的指针
        // 返回:
        //   Closure* - 指向新创建的 Closure 对象的指针
        inline Closure *NewCallback(void (*function)());

        // RpcController 类用于控制 RPC 调用的状态
        class PROTOBUF_EXPORT RpcController
        {
        public:
            // 检查 RPC 调用是否失败
            // 返回:
            //   bool - 如果调用失败则返回 true,否则返回 false
            bool Failed();

            // 获取错误信息文本
            // 返回:
            //   std::string - 表示错误的文本信息
            std::string ErrorText();
        };
    }
}


服务端类与接口

namespace brpc
{
    // ServerOptions 结构用于配置服务器选项
    struct ServerOptions
    {
        // 空闲超时时间,超过该时间后关闭连接
        int idle_timeout_sec; // 默认值: -1(禁用)
        // 服务器线程数量,默认值为 CPU 核心数
        int num_threads;      // 默认值: #cpu-cores
        // 其他可能的选项...
    };

    // ServiceOwnership 枚举定义服务的所有权管理方式
    enum ServiceOwnership {
        // 当添加服务失败时,服务器负责删除服务对象
        SERVER_OWNS_SERVICE,
        // 当添加服务失败时,服务器不会删除服务对象
        SERVER_DOESNT_OWN_SERVICE
    };

    // Server 类表示一个 BRPC 服务器
    class Server
    {
    public:
        // 添加服务到服务器
        // 参数:
        //   service - 要添加的服务对象
        //   ownership - 服务的所有权类型
        // 返回:
        //   int - 返回操作结果的状态码
        int AddService(google::protobuf::Service *service, ServiceOwnership ownership);

        // 启动服务器
        // 参数:
        //   port - 监听的端口号
        //   opt - 服务器选项
        // 返回:
        //   int - 返回启动状态
        int Start(int port, const ServerOptions *opt);

        // 停止服务器
        // 参数:
        //   closewait_ms - 等待关闭的时间(不再使用)
        // 返回:
        //   int - 返回停止状态
        int Stop(int closewait_ms /*not used anymore*/);

        // 等待服务器完成所有任务并退出
        // 返回:
        //   int - 返回加入状态
        int Join();

        // 运行服务器,直到收到退出请求
        void RunUntilAskedToQuit();
    };

    // ClosureGuard 类用于确保在作用域结束时执行回调
    class ClosureGuard
    {
    public:
        explicit ClosureGuard(google::protobuf::Closure *done)
            : _done(done) {} // 初始化回调指针

        ~ClosureGuard()
        {
            if (_done)
                _done->Run(); // 在析构时调用回调
        }

    private:
        google::protobuf::Closure *_done; // 存储回调指针
    };

    // HttpHeader 类表示 HTTP 请求或响应的头部信息
    class HttpHeader
    {
    public:
        // 设置内容类型
        void set_content_type(const std::string &type);

        // 获取指定键的头部值
        const std::string *GetHeader(const std::string &key);

        // 设置指定键的头部值
        void SetHeader(const std::string &key, const std::string &value);

        // 获取 URI
        const URI &uri() const { return _uri; }

        // 获取 HTTP 方法
        HttpMethod method() const { return _method; }

        // 设置 HTTP 方法
        void set_method(const HttpMethod method);

        // 获取状态码
        int status_code();

        // 设置状态码
        void set_status_code(int status_code);
        
    private:
        URI _uri;          // 存储 URI 信息
        HttpMethod _method; // 存储 HTTP 方法
        // 其他可能的成员...
    };

    // Controller 类用于管理 RPC 调用
    class Controller : public google::protobuf::RpcController
    {
    public:
        // 设置超时时间
        void set_timeout_ms(int64_t timeout_ms);

        // 设置最大重试次数
        void set_max_retry(int max_retry);

        // 获取响应消息
        google::protobuf::Message *response();

        // 获取 HTTP 响应头
        HttpHeader &http_response();

        // 获取 HTTP 请求头
        HttpHeader &http_request();

        // 检查 RPC 调用是否失败
        bool Failed();

        // 获取错误文本
        std::string ErrorText();

        // 定义 RPC 响应后的回调函数类型
        using AfterRpcRespFnType = std::function<
            void(Controller *cntl,
                 const google::protobuf::Message *req,
                 const google::protobuf::Message *res)>; 

        // 设置 RPC 响应后的回调函数
        void set_after_rpc_resp_fn(AfterRpcRespFnType &&fn);

    private:
        // 其他成员...
    };
}


客户端类与接口

namespace brpc
{
    // ChannelOptions 结构用于配置通道选项
    struct ChannelOptions
    {
        // 请求连接超时时间,单位为毫秒
        int32_t connect_timeout_ms; // 默认值: 200 毫秒

        // RPC 请求超时时间,单位为毫秒
        int32_t timeout_ms;          // 默认值: 500 毫秒

        // 最大重试次数
        int max_retry;               // 默认值: 3

        // 序列化协议类型
        AdaptiveProtocolType protocol; // 默认值: "baidu_std"
        
        // 其他可能的选项...
    };

    // Channel 类表示一个 RPC 通道,用于和服务器进行通信
    class Channel : public ChannelBase
    {
    public:
        // 初始化接口
        // 参数:
        //   server_addr_and_port - 服务器地址和端口
        //   options - 指向 ChannelOptions 的指针,用于配置通道
        // 返回:
        //   int - 成功返回 0,失败返回错误码
        int Init(const char *server_addr_and_port,
                 const ChannelOptions *options);
    };

    // 其他相关类和功能...
}


使用

同步调用 & 异步调用

同步调用是指在程序中,当一个函数被调用时,调用者会等待被调用的函数执行完毕并返回结果后,才会继续执行后面的代码。

放在brpc中,同步调用是指客户端在发送请求后,会以阻塞的方式等待服务端的响应;

首先编写proto文件:

syntax = "proto3";

package emp;

option cc_generic_services = true; // 生成通用服务代码 (用于rpc)

message EchoRequest {
    string message = 1;
}

message EchoResponse {
    string message = 1;
}

service EchoService {
    rpc Echo(EchoRequest) returns (EchoResponse);
}

编辑后开始写server代码:

#include <butil/logging.h>
#include <brpc/server.h>
#include "main.pb.h"

class EchoServiceT : public emp::EchoService {
public:
    EchoServiceT() {}
    ~EchoServiceT() {}
    
    // 重写父类回声函数
    void Echo(google::protobuf::RpcController* controller,
                       const ::emp::EchoRequest* request,
                       ::emp::EchoResponse* response,
                       ::google::protobuf::Closure* done)
    {
        brpc::ClosureGuard done_guard(done); // 自动调用done->Run()
        std::cout << "接收到消息: " << request->message() << std::endl;

        std::string msg = "响应消息: " + request->message();
        response->set_message(msg);
    }
};

int main(int argc, char* argv[]) {
    // 0. 关闭brpc默认日志输出
    logging::LoggingSettings settings;
    settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;
    logging::InitLogging(settings);
    // 1. 初始化服务器对象
    brpc::Server server;
    // 2. 注册服务
    EchoServiceT echo_service;
        // SERVER_OWNS_SERVICE 服务器负责管理销毁 该服务
        // SERVER_DOESNT_OWN_SERVICE 服务器不负责该服务的生命周期
    auto ret = server.AddService(&echo_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE);
    if (ret != 0) {
        std::cout << "添加RPC服务失败。" << std::endl;
        return -1;
    }

    // 3. 启动服务器
    brpc::ServerOptions options;
    options.idle_timeout_sec = -1; // 设置超时时间 为-1,表示不超时
    options.num_threads = 1; // 设置线程数
    ret = server.Start(8080, &options);
    if (ret != 0) {
        std::cout << "启动服务器失败。" << std::endl;
        return -1;
    }

    server.RunUntilAskedToQuit(); // 阻塞等待直到收到退出信号

    return 0;
}

客户端代码:

#include <brpc/channel.h>
#include <thread>
#include <iostream>
#include "main.pb.h"

#define SYNC 0

// 异步回调函数
void callback(brpc::Controller *cntl, emp::EchoResponse *resp) {
    std::unique_ptr<brpc::Controller> cntl_guard(cntl);
    std::unique_ptr<emp::EchoResponse> resp_guard(resp);
    if (cntl->Failed()) {
        std::cout << "RPC调用失败: " << cntl->ErrorText() << std::endl;
        return;
    }
    std::cout << "收到响应: " << resp->message() << std::endl;
}

int main(int argc, char* argv[]) {
    // 1. 创建信道 连接服务器
    brpc::ChannelOptions options;
    options.protocol = "baidu_std"; // 序列化协议 默认
    options.connect_timeout_ms = -1; // 连接超时时间 -1表示永不超时
    options.timeout_ms = -1; // 超时时间 -1表示永不超时
    options.max_retry = 3; // 最大重试次数
    brpc::Channel channel;
    if (channel.Init("127.0.0.1:8080", &options) != 0) {
        std::cout << "初始化信道失败" << std::endl;
        return -1;
    }
    // 2. 构造EchoService_Stub对象(用于RPC调用)
    emp::EchoService_Stub stub(&channel);
    // 3. 进行RPC调用
    emp::EchoRequest req;
    std::cout << "请输入消息: ";
    std::string msg;
    getline(std::cin, msg);
    req.set_message(msg);
    // 4. 构造Controller对象(用于控制RPC调用)
    brpc::Controller *cntl = new brpc::Controller();
    emp::EchoResponse *resp = new emp::EchoResponse();
    #if SYNC // 同步调用
        stub.Echo(cntl, &req, resp, nullptr); // google::protobuf::Closure *done: 传入nullptr代表同步调用
        if(cntl->Failed()) {
            std::cout << "RPC调用失败: " << cntl->ErrorText() << std::endl;
            return -1;
        }

        std::cout << "RPC调用成功, 响应信息: " << resp->message() << std::endl;
        delete cntl;
        delete resp;
    #else // 异步调用
    auto clusure = google::protobuf::NewCallback(callback, cntl, resp);
    stub.Echo(cntl, &req, resp, clusure);
    std::cout << "异步调用成功" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));

    #endif
    return 0;
}

在这里插入图片描述


封装

封装思想

由于不同的服务调用使用不同的 Stub,其封装意义不大。因此,我们主要封装通信所需的 Channel。当需要进行服务调用时,只需通过服务名称获取对应的 Channel,然后实例化 Stub 进行调用即可。

设计概要

  • Channel 管理类

    • 每个服务可能有多个主机提供服务,因此一个服务可能对应多个 Channel。需要管理这些 Channel,并提供获取指定服务 Channel 的接口。
    • 在进行 RPC 调用时,根据 Round Robin(RR)轮转策略选择 Channel。
  • 服务声明接口

    • 整体项目中通常会提供多个服务,当前可能并不需要用到所有服务。因此,通过声明来告知模块当前关心的服务,并建立连接进行管理。未声明的服务即使上线也不需要进行连接的建立。
  • 服务上线处理接口

    • 提供新增指定服务的 Channel 的接口,以便在服务上线时进行管理。
  • 服务下线处理接口

    • 提供删除指定服务下的 Channel 的接口,以便在服务下线时进行管理。

代码

class ServiceChannel 
{
public:
    using ptr = std::shared_ptr<ServiceChannel>;
    using ChannelPtr = std::shared_ptr<brpc::Channel>;

    ServiceChannel(const std::string& service_name) : 
        _index(0), _service_name(service_name) {}

    void append(const std::string& host)
    {
        // 创建信道
        auto channel = std::make_shared<brpc::Channel>();
        brpc::ChannelOptions options;
        options.protocol = "baidu_std";
        options.timeout_ms = -1;
        options.connect_timeout_ms = -1;
        options.max_retry = 3;

        int ret = channel->Init(host.c_str(), &options);
        if (ret != 0)
        {
            LOG_ERROR("初始化{}-{}信道失败", _service_name, host);
            return;
        }
        std::unique_lock<std::mutex> lock(_mutex);
        _hosts.insert({host, channel});
        _channels.push_back(channel);
    } /* 服务上线一个节点 - 调用append新增信道 */

    void remove(const std::string& host)
    {
        std::unique_lock<std::mutex> lock(_mutex);
        auto it = _hosts.find(host);
        if(it == _hosts.end()) {
            LOG_WARN("{}-{}删除时,未找到信道信息", _service_name, host);
            return;
        }

        for(auto vit = _channels.begin(); vit != _channels.end(); ++vit) 
        {
            if(*vit == it->second) {
                _channels.erase(vit);
                break;
            }
        }
        _hosts.erase(it);

        LOG_INFO("{}-{}删除成功", _service_name, host);
    } /* 服务下线一个节点 - 调用remove释放信道 */

    ChannelPtr getChannel() {
        std::unique_lock<std::mutex> lock(_mutex);
        if(_channels.empty()) {
            LOG_ERROR("当前没有能提供{}服务的节点", _service_name);
            return ChannelPtr();
        }

        int32_t idx = _index++ % _channels.size(); // 轮转索引
        return _channels[idx];
    }

private:
    std::mutex _mutex; // 互斥锁
    int32_t _index; // 轮转索引
    std::string _service_name; // 服务名称
    std::vector<ChannelPtr> _channels; // 服务对应的信道集合
    std::unordered_map<std::string, ChannelPtr> _hosts; // // 主机地址到信道映射
};

class ServiceManager
{
public:
    using ptr = std::shared_ptr<ServiceManager>;
    ServiceManager() {} 

    /* 获取指定服务的信道节点 */
    ServiceChannel::ChannelPtr getChannel(const std::string& service_name) {
        std::unique_lock<std::mutex> lock(_mutex);
        auto it = _services.find(service_name);
        if(it == _services.end()) {
            LOG_ERROR("当前没有能提供{}服务的节点", service_name);
            return ServiceChannel::ChannelPtr();
        }

        return it->second->getChannel();
    }

    /* 声明关注的服务 */
    void declareTrackService(const std::string& service_name) {
        std::unique_lock<std::mutex> lock(_mutex);
        _track_services.insert(service_name);
    }

    /* 服务上线回调 */
    void onServiceOnline(const std::string& service_instance, const std::string& host) {
        std::string service_name = getServiceName(service_instance);
        ServiceChannel::ptr service;
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto tit = _track_services.find(service_name);
            if(tit == _track_services.end()) {
                LOG_DEBUG("{}-{}服务上线了 (不在关注列表中)", service_name, host);
                return;
            }

            auto sit = _services.find(service_name);
            if(sit == _services.end()) {
                service = std::make_shared<ServiceChannel>(service_name);
                _services.insert({service_name, service});
            } else {
                service = sit->second;
            }
        }

        if(!service) {
            LOG_ERROR("{}服务新增失败", service_name);
            return;
        }

        service->append(host);
        LOG_DEBUG("{}服务新增成功", service_name);
    }

    /* 服务下线回调 */
    void onServiceOffline(const std::string& service_instance, const std::string& host) {
        std::string service_name = getServiceName(service_instance);
        ServiceChannel::ptr service;
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto tit = _track_services.find(service_name);
            if(tit == _track_services.end()) {
                LOG_DEBUG("{}-{}服务下线了 (不在关注列表中)", service_name, host);
                return;
            }

            auto sit = _services.find(service_name);
            if(sit == _services.end()) {
                LOG_WARN("删除{}服务时,未找到管理对象", service_name);
                return;
            }
            service = sit->second;
        }
        service->remove(host);
        LOG_DEBUG("{}服务删除成功", service_name);
    }
private:
    std::string getServiceName(const std::string& service_instance) {
        auto pos = service_instance.find_last_of('/');
        if(pos == std::string::npos) {
            return service_instance;
        }
        return service_instance.substr(0, pos);
    }

private:
    std::mutex _mutex;
    std::unordered_set<std::string> _track_services; // 跟踪的服务集合
    std::unordered_map<std::string, ServiceChannel::ptr> _services; // 服务名称到信道集合的映射
};
Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐