为 reqwest 增加中间件支持

译者:华为-王江桐,华为-周紫鹏


原文

继续我们的开源系列,我们为无处不在的reqwest Rust crate提供了一个中间件适配器。

这是我们开源系列的第二篇文章,我们将在其中讨论TrueLayer的工程挑战并开源我们的解决方案。如果你错过了我们的第一篇推文,我们的第一篇推文是Rust中的gRPC负载平衡。(原文/中文月刊

本文主题是reqwest-middleware,一个构建在reqwest HTTP客户端之上的crate,用于提供中间件功能。

问题

当通过网络与内部和外部服务进行通信时,由于服务会失败,大规模运行应用程序需要内置的韧性。

重试是提高可靠性的常用策略。机制相当简单:将每个请求包装在一个循环中并重试,直到获得成功响应或尝试次数用完为止。

我们的代码库中有数十个客户端:我们不想以特别的方式为每个客户端重新实现重试。

同时,我们更愿意让我们的域代码不受这种网络级别的限制——最完美的方式是,在 HTTP 客户端本身中透明地实现重试。

我们可以编写一个RetryHttpClient来包装标准客户端,以增加重试功能——但重试并不是全部。我们希望 HTTP 客户端处理其他功能:分布式跟踪header的传播、缓存、日志记录。

但是我们不想编写TracingRetryableClientTracingRetryableCachingHttpClientRetryableTracingCachingHttpClient(顺序很重要!)以及所有其他可能的组合。

我们想要一种可组合的抽象模式。

所有这些功能都遵循相同的模式:

  • 我们想在执行请求之前和之后运行一些任意逻辑
    
  • 该逻辑完全独立于问题域,它只关注底层传输和整个组织统一的要求(例如日志记录标准)。
    

好消息是,这是软件系统中常见的一个问题,因此也有一个通用的解决方案:中间件。

(P.S.:请注意,我们在本文中指的是一种非常特殊的中间件,该术语本身更为笼统。有关中间件在其他上下文中的用法,请参阅中间件的维基百科页面

Rust HTTP客户端中间件

在TrueLayer,我们使用reqwest作为我们所有Rust服务的 HTTP 客户端。

我们选择它是因为它提供了async-first API,与tokio兼容,并且它已广泛的在生产中使用。

遗憾的是,reqwest不支持现有即用的中间件。

我们的选择是什么?

  • 使用现成的crate替换<span style="background-color:#D2D2D2">reqwest</span>,或者在<span style="background-color:#D2D2D2">reqwest</span>之上做拓展。在撰写本文时,对我们来说,没有其他完善的、支持中间件的 Rust HTTP 客户端能够提供与reqwest一样的功能。[surf](https://crates.io/crates/surf)非常流行并且内置中间件,但[它需要引入async-std](https://github.com/http-rs/surf/issues/295)。
    
  • 尝试去获取上游实现的中间件支持。reqwest的维护者从 2017 年开始讨论这个问题(请参阅[ISSUE](https://github.com/seanmonstar/reqwest/issues/155)),但似乎仍然没有达成共识,甚至没有就此类功能是否属于该crate 达成共识。因此,我们不太可能在短期内完成某些事情。
    
  • 最后一个选择是,包装<span style="background-color:#D2D2D2">reqwest</span>并在其上实现中间件,所以这就是我们采用的方法。<span style="background-color:#D2D2D2">reqwest-middleware</span>诞生了。
    

使用reqwest-middleware我们能够将中间件附加到Client上,然后就像我们直接使用reqwest一样发出请求:

use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
use reqwest_tracing::TracingMiddleware;

#[tokio::main]
async fn main() {
    let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
    let client = ClientBuilder::new(reqwest::Client::new())
        .with(TracingMiddleware)
        .with(RetryTransientMiddleware::new_with_policy(retry_policy))
        .build();
    run(client).await;
}

async fn run(client: ClientWithMiddleware) {
    // free retries!
    client
        .get("https://some-external-service.com")
        .header("foo", "bar")
        .send()
        .await
        .unwrap();
}

现有技术

在讨论我们的实现之前,让我们先看看现有的一些常用的中间件API:

Surf

Surf 是一个Rust HTTP客户端。这是他们文档中的中间件示例:


#![allow(unused)]
fn main() {
/// Log each request's duration
#[derive(Debug)]
pub struct Logger;

#[surf::utils::async_trait]
impl Middleware for Logger {
    async fn handle(
        &self,
        req: Request,
        client: Client,
        next: Next<'_>,
    ) -> Result<Response> {
        println!("sending request to {}", req.url());
        let now = time::Instant::now();
        let res = next.run(req, client).await?;
        println!("request completed ({:?})", now.elapsed());
        Ok(res)
    }
}
}

我们能看到,它接受一个请求对象和一个next值,该值可用于将该请求转发到剩余的管道中,并返回一个Response。这让我们在向下转发之前,可以通过改变请求方式来处理请求,我们还可以在返回之前更改从next.run返回的res值。

我们甚至可以在next周围使用控制流,它允许重试和短路:


#![allow(unused)]
fn main() {
#[derive(Debug)]
pub struct ConditionalCall;

#[surf::utils::async_trait]
impl Middleware for ConditionalCall {
    async fn handle(
        &self,
        req: Request,
        client: Client,
        next: Next<'_>,
    ) -> Result<Response> {
        // Silly example: return a dummy response 50% of the time
        if rand::random()::<bool>() {
          let res = next.run(req, client).await?;
          Ok(res)
        } else {
          let response = http_types::Response::new(StatusCode::Ok);
          Ok(response)
        }
    }
}
}

Express

Express是一个完善的Node.js Web框架。它的中间件被编写为普通函数,这是他们文档中的一个例子:

app.use(function (req, res, next) {
  console.log('Time:', Date.now())
  next()
})

这与surf的方法非常相似,除了我们使用response对象并可以直接改变它:中间件函数不返回任何内容。

Tower

tower是用于网络应用程序的通用Rust组件库。

它被用于许多著名crate中,例如hypertonictower的中间件有点复杂,很可能是因为,他们不想强制用户使用动态调度(例如async_trait)。

至于其他库,这是tower文档中给出的示例:


#![allow(unused)]
fn main() {
pub struct LogLayer {
    target: &'static str,
}

impl<S> Layer<S> for LogLayer {
    type Service = LogService<S>;

    fn layer(&self, service: S) -> Self::Service {
        LogService {
            target: self.target,
            service
        }
    }
}

// This service implements the Log behavior
pub struct LogService<S> {
    target: &'static str,
    service: S,
}

impl<S, Request> Service<Request> for LogService<S>
where
    S: Service<Request>,
    Request: fmt::Debug,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.service.poll_ready(cx)
    }

    fn call(&mut self, request: Request) -> Self::Future {
        // Insert log statement here or other functionality
        println!("request = {:?}, target = {:?}", request, self.target);
        self.service.call(request)
    }
}
}

忽略用于反压的poll_ready方法,towerService被定义为从请求到响应的函数:call返回一个Future,其中Future::ItemService::Response的关联类型。

surf中的异步中间件的trait更为简单,因为它依赖于过程宏(async_trait),在trait中使用async fn语法——在底层它转换为boxing futures 。这是必要的,因为trait方法尚不支持异步。请参阅Nicholas D. Matsakis的这篇文章以深入了解原因。

tower中的中间件是通过Layer trait定义的,该trait将一个服务映射到另一个服务。实现这个特性通常涉及让一个通用结构包装一些Service并委托对它的调用。

被包装的Servicesurfexpress中的next参数起到相同的作用。它提供了一种调用中间件链其余部分的方法。这种方法仍然允许我们使用next的API相同的方式处理请求和响应。

Finagle

Finagle是一个用Scala编写的JVM RPC系统。让我们也从finagle文档中举一个中间件示例:

class TimeoutFilter[Req, Rep](timeout: Duration, timer: Timer)
  extends SimpleFilter[Req, Rep] {
  def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
    val res = service(request)
    res.within(timer, timeout)
  }
}

这里的Servicetower非常相似:一个从请求到响应的函数。

Finagle中的中间件称为FilterFilter类型比towerLayer更复杂,因为它不要求apply中的ReqRep类型与服务参数中请求和回复的类型保持一致。

SimpleFilter,顾名思义,是具有固定请求/响应类型的简化版本。SimpleFilter将一个请求和包装服务作为参数,并返回一个响应,因此它的功能类似tower API,但是将Layer::layerService::call压缩到了单个SimpleFilter::apply方法中。

中间件类型

一般来说,你会发现,中间件API分为两类:要么是一个参数为请求和next的函数,就像surfexpress;或者从一个映射服务到另一个,就像towerFinagle.

总的来说,这两种方法都提供了同样多的灵活性。两者都需要每个中间件至少有一个额外的动态分发,因为 Rust不支持在 trait 方法的返回类型中包含impl Trait(目前),所以我们采用Next方法,因为这使得更容易实现中间件。surftower之间的差异证明了这一点。

reqwest-中间件

我们最终得到了一个非常标准的中间件API(有关API的更详细描述,请参阅文档):


#![allow(unused)]
fn main() {
#[async_trait]
pub trait Middleware {
  async fn handle(&self, req: Request, extensions: &mut Extensions, next: Next<'_>) 
   -> Result<Response>;
}
}

Extensions用于以类型安全的方式跨中间件获取任意信息,不论是从外部中间件到更深的中间件,还是从内部中间件到以前的中间件。

出于演示目的,举例一个简单的日志中间件实现:


#![allow(unused)]
fn main() {
use reqwest::{Request, Response};
use reqwest_middleware::{Middleware, Next};
use truelayer_extensions::Extensions;

struct LoggingMiddleware;

#[async_trait::async_trait]
impl Middleware for LoggingMiddleware {
    async fn handle(
        &self,
        req: Request,
        extensions: &mut Extensions,
        next: Next<'_>,
    ) -> reqwest_middleware::Result<Response> {
        tracing::info!("Sending request {} {}", req.method(), req.url());
        let resp = next.run(req, extensions).await?;
        tracing::info!("Got response {}", resp.status());
        Ok(resp)
    }
}
}
use reqwest_middlewar::ClientBuilder;

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();

    let client = ClientBuilder::new(reqwest::Client::new())
        .with(LoggingMiddleware)
        .build();
    client
        .get("https://truelayer.com/")
        .send()
        .await
        .unwrap();
}
$ RUST_LOG=info cargo run
Jul 20 19:59:35.585  INFO post_reqwest_middleware: Sending request GET https://truelayer.com/
Jul 20 19:59:35.705  INFO post_reqwest_middleware: Got response 200 OK

结论

我们使用启用中间件的客户端包装reqwest,该客户端使用相同的简单API。这使得能够为我们的韧性和可观察性需求构建可重用的组件。

最重要的是,我们还发布了reqwest-retryreqwest-opentracing,它们应该能涵盖reqwest crate很多的使用场景。

开发人员现在可以通过导入几个crate并将with_middleware调用添加到客户端设置代码来强化与远程HTTP的集成——而不会中断任何其他应用程序代码。