微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Dubbo 压测插件 2.0 —— 基于普通 API 调用

 有赞技术 有赞coder

插件已开源,详见 gatling-dubbo:https://github.com/youzan/gatling-dubbo.git

上一篇《Dubbo压测插件的实现——基于Gatling》中,我们介绍了基于 dubbo 泛化调用实现的 gatling dubbo 压测插件,使用泛化调用发起 dubbo 压测请求,consumer 端不需要拿到 provider 端的 API 包,使用上很便利,但是众所周知,dubbo 泛化调用性能不如普通 API 调用,虽然可以优化并使之达到与普通 API 调用相近的性能,但仍存在一些局限性。生产中除了网关等特殊应用外,一般很少使用泛化调用,如果以泛化调用性能来表征生产中普通 API 调用性能,其压测结论很难令人信服。做压测的时候,一般要求各种条件如环境等都尽可能保持一致。所以,我们又开发了基于普通 API 调用gatling dubbo 压测插件,即 gatling-dubbo2.0。此外,依托于 gatling 强大的基础能力, gatling-dubbo2.0 相比于 Jmeter 还存在以下几方面的优势:


  • 更强的场景编排能力,支持多场景同时编排,如仿真电商业务中同时存在普通下单、团购、秒杀等多种交易类型的场景

  • 支持设置场景内流量模型,如漏斗模型,仿真用户从商品浏览 -> 加入购物车 -> 下单 -> 支付过程中的各级转化率

  • 不需要安装额外插件,原生支持设置压力模型,如设置压测需要达到的目标 RPS,甚至逐级加压进行梯度压力测试

  • 更低的资源消耗,更高的并发能力


一、插件主要组成

Action 和 ActionBuild
执行部分,这里的作用是发起 dubbo 请求,校验请求结果并记录日志以便后续生成压测报告。ActionBuild 则为 DSL 使用 Action 的辅助类


Check 和 CheckBuild
校验部分,全链路压测中我们使用 json path 校验 HTTP 请求结果,这里我们实现了一样的校验方法,而且,对于一些不规范的返回结果(如返回了基本数据类型),还增加自定义校验方法。CheckBuild 则为 DSL 使用 Check 的辅助类。


DSL
插件的领域特定语言,提供简单易用的 API 方便编写 dubbo 压测脚本。

图片


1.1 Action

dubboAction 包含了发起 dubbo 请求、请求结果校验以及压力控制逻辑,需要扩展 gatling 的 ExitableAction 并实现 execute 方法


dubboAction 的入参 f 是一个函数,从压测脚本传入,函数负责组织 dubbo 请求,从 session 中取值并动态构造请求参数。这一过程类似于使用 Jmeter 压测 Java 接口,即扩展 AbstractJavaSamplerClient。所以,gatling-dubbo 2.0 也支持dubbo 的其他 java 调用压测,因为 f 怎么写的控制权完全掌握在写压测脚本的人手里(本质上,远程调用和本地调用的客户端使用方式上并没有区别)。


所有虚拟用户以并发方式执行 execute 方法,每个用户又以异步方式执行 dubbo 请求,且无论请求是否正确返回,都需要记录相应的成功或失败日志,失败可能是由于请求失败了,也可能是请求成功了,但是校验请求结果失败了。下一步就是准备发起新的 dubbo 请求,如果开启了 Rps 阀门(throttled),则会根据当前的 Rps 和 Rps 阀门阈值动态调整发送请求的频率,在施压机(consumer)未达到性能瓶颈的情况下,可以很稳定的保持在设置的 Rps 目标值上进行压测。如果 Rps 阀门未开启,则直接发起新的 dubbo 请求(通过 AKKA Message 触发)。


  1. class dubboAction[A]( requestName:      Expression[String],

  2.                      f:                (Session) => A,

  3.                      val executor:     ExecutorService,

  4.                      val objectMapper: ObjectMapper,

  5.                      checks:           List[dubboCheck],

  6.                      coreComponents:   CoreComponents,

  7.                      throttled:        Boolean,

  8.                      val next:         Action

  9.                    ) extends ExitableAction with NameGen {

  10.  ......

  11.  override def execute(session: Session): Unit = recover(session) {

  12.    requestName(session) map { reqName =>

  13.      val startTime = System.currentTimeMillis()

  14.      val fu = Future {

  15.        try {

  16.          f(session)

  17.        } finally {

  18.        }

  19.      }


  20.      fu.onComplete {

  21.        case Success(result) =>

  22.          val endTime = System.currentTimeMillis()

  23.          val resultJson = objectMapper.writeValueAsstring(result)

  24.          val (newSession, error) = Check.check(resultJson, session, checks)

  25.          error match {

  26.            case None =>

  27.              statsEngine.logResponse(session, reqName, ResponseTimings(startTime, endTime), Status("OK"), None, None)

  28.              throttle(newSession(session))

  29.            case Some(Failure(errorMessage)) =>

  30.              statsEngine.logResponse(session, reqName, ResponseTimings(startTime, endTime), Status("KO"), None, Some(errorMessage))

  31.              throttle(newSession(session).markAsFailed)

  32.          }


  33.        case UFailure(e) =>

  34.          val endTime = System.currentTimeMillis()

  35.          statsEngine.logResponse(session, reqName, ResponseTimings(startTime, endTime), Status("KO"), None, Some(e.getMessage))

  36.          throttle(session.markAsFailed)

  37.      }

  38.    }

  39.  }


  40.  private def throttle(s: Session): Unit = {

  41.    if (throttled) {

  42.      coreComponents.throttler.throttle(s.scenario, () => next ! s)

  43.    } else {

  44.      next ! s

  45.    }

  46.  }

  47. }


dubboActionBuilder 负责创建线程池并实例化 dubboAction:

case class dubboActionBuilder[A](requestName: Expression[String], f: (Session) => A, checks: List[dubboCheck], threadPoolSize: Int) extends ActionBuilder {  override def build(ctx: ScenarioContext, next: Action): Action = {    import ctx._    val executor = Executors.newFixedThreadPool(threadPoolSize)    val objectMapper: ObjectMapper = new ObjectMapper()    new dubboAction[A](requestName, f, executor, objectMapper, checks, coreComponents, throttled, next)  }}


LambdaProcessBuilder 提供了设置 check 条件的 DSL 和 设置线程池大小的 DSL:

有赞的施压机是 4 核 8Gb 内存的,我们为其设置的认线程池大小为 200,与 dubbo 应用部署环境一致。你可以使用 DSL threadPoolSize(threadPoolSize: Int) 按照你的机器配置设置一个合适的线程池大小。如果施压机成了性能瓶颈,你可以考虑将其改造成集群来施压,具体可参考《有赞全链路压测引擎的设计与实现》

  1. case class dubboProcessBuilder[A](requestName: Expression[String], f: (Session) => A, checks: List[dubboCheck] = Nil, threadPoolSize: Int = 200) extends dubboCheckSupport {

  2.  def check(dubboChecks: dubboCheck*): dubboProcessBuilder[A] = copy[A](checks = checks ::: dubboChecks.toList)


  3.  def threadPoolSize(threadPoolSize: Int): dubboProcessBuilder[A] = copy[A](threadPoolSize = threadPoolSize)


  4.  def build(): ActionBuilder = dubboActionBuilder[A](requestName, f, checks, threadPoolSize)

  5. }


1.2 Check

全链路压测中,我们使用 json path 校验 HTTP 请求结果,dubbo 压测插件中,我们也实现了基于 json path 的校验方法

  1. package object dubbo {

  2.  type dubboCheck = Check[String]


  3.  val dubboStringExtender: Extender[dubboCheck, String] =

  4.    (check: dubboCheck) => check


  5.  val dubboStringPreparer: Preparer[String, String] =

  6.    (result: String) => Success(result)

  7. }

  1. trait dubboJsonPathOfType {

  2.  self: dubboJsonPathCheckBuilder[String] =>


  3.  def ofType[X: JsonFilter](implicit extractorFactory: JsonPathExtractorFactory) = new dubboJsonPathCheckBuilder[X](path, jsonParsers)

  4. }


  5. object dubboJsonPathCheckBuilder {

  6.  val CharsParsingThreshold = 200 * 1000


  7.  def preparer(jsonParsers: JsonParsers): Preparer[String, Any] =

  8.    response => {

  9.      if (response.length() > CharsParsingThreshold || jsonParsers.preferJackson)

  10.        jsonParsers.safeParseJackson(response)

  11.      else

  12.        jsonParsers.safeParseBoon(response)

  13.    }


  14.  def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) =

  15.    new dubboJsonPathCheckBuilder[String](path, jsonParsers) with dubboJsonPathOfType

  16. }


  17. class dubboJsonPathCheckBuilder[X: JsonFilter](

  18.    private[check] val path:        Expression[String],

  19.    private[check] val jsonParsers: JsonParsers

  20. )(implicit extractorFactory: JsonPathExtractorFactory)

  21.  extends DefaultMultipleFindCheckBuilder[dubboCheck, String, Any, X](

  22.    dubboStringExtender,

  23.    dubboJsonPathCheckBuilder.preparer(jsonParsers)

  24.  ) {

  25.  import extractorFactory._


  26.  def findExtractor(occurrence: Int) = path.map(newSingleExtractor[X](_, occurrence))

  27.  def findAllExtractor = path.map(newMultipleExtractor[X])

  28.  def countExtractor = path.map(newCountExtractor)

  29. }


但有时候存在一些不规范的情况,dubbo 接口的返回结果并不能直接转化为 json,如返回了基本数据类型,所以我们还提供了自定义校验方法,可以将这样的返回结果转化为 String 类型,并使用字符串比较、正则表达式匹配等方法校验返回结果:


case class dubboCustomCheck(func: String => Boolean, failureMessage: String = "dubbo check Failed") extends dubboCheck {  override def check(response: String, session: Session)(implicit cache: mutable.Map[Any, Any]): Validation[CheckResult] = {    func(response) match {      case true => CheckResult.NoopCheckResultSuccess      case _    => Failure(failureMessage)    }  }}


dubboCheckSupport 则提供了 json pathcustom 两种检验方式的 DSL

  1. trait dubboCheckSupport {

  2.  def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) =

  3.    dubboJsonPathCheckBuilder.jsonPath(path)


  4.  def custom = dubboCustomCheck

  5. }


dubbo 压测脚本中可以设置一个或多个 check 来校验请求结果

1.3 DSL

dubboDsl 提供顶层 DSL,隐式方法 dubboProcessBuilder2ActionBuilderScala 用于自动dubboProcessBuilder 构造 ActionBuilder

  1. trait dubboDsl extends dubboCheckSupport {

  2.  def dubbo[A](requestName: Expression[String], f: (Session) => A) = dubboProcessBuilder[A](requestName, f)


  3.  implicit def dubboProcessBuilder2ActionBuilder[A](builder: dubboProcessBuilder[A]): ActionBuilder = builder.build()

  4. }


二、示例

2.1 压测脚本示例

  1. class Mix extends Simulation {

  2.  val application = new ApplicationConfig()

  3.  application.setName("gatling-dubbo")


  4.  // 初始化 AService

  5.  val referenceAService = new ReferenceConfig[AService]

  6.  referenceAService.setApplication(application)

  7.  referenceAService.setUrl("dubbo://IP:PORT/com.xxx.service.AService")

  8.  referenceAService.setInterface(classOf[AService])

  9.  val aService = referenceAService.get()


  10.  // 初始化 BService

  11.  val referenceBService = new ReferenceConfig[BService]

  12.  referenceBService.setApplication(application)

  13.  referenceBService.setUrl("dubbo://IP:PORT/com.yyy.service.BService")

  14.  referenceBService.setInterface(classOf[BService])

  15.  val bService = referenceBService.get()


  16.  // 设置数据源

  17.  val jsonFileFeeder = jsonFile("data.json").shuffle.circular

  18.  val mixScenario = scenario("scenario of mix")

  19.      .forever("tripsCount") {

  20.      Feed(jsonFileFeeder)

  21.        .randomSwitch(11d -> exec(

  22.          dubbo("com.xxx.service.AService.aMethod", fAMethod)

  23.            .check(jsonPath("$.success").is("true"))

  24.        )

  25.        )

  26.        .randomSwitch(4d -> exec(

  27.          dubbo("com.yyy.service.BService.bMethod", fBMethod)

  28.            .check(jsonPath("$.success").is("true"))

  29.        )

  30.        )

  31.        .randomSwitch(5d -> exec(

  32.          ......

  33.        )

  34.        ......

  35.        )

  36.    }


  37.  setUp(mixScenario.inject(constantUsersPerSec(100) during (10 seconds)).throttle(reachRps(1000) in (1 seconds), holdFor(120 seconds)))


  38.  // 设置 aMethod 的请求参数并调用

  39.  def fAMethod(session: Session): Object = {

  40.    val aParam = new Aparam()

  41.    aParam.setName("A Name");

  42.    // 从 session 中获取动态参数并设置

  43.    aParam.setAId(session.attributes("aId").asInstanceOf[Integer].toLong);

  44.    aService.aMethod(aParam);

  45.  }


  46.  // 设置 bMethod 的请求参数并调用

  47.  def fBMethod(session: Session): Object = {

  48.    val bParam = new Bparam()

  49.    bParam.setAge(26)

  50.    // 从 session 中获取动态参数并设置

  51.    bParam.setBId(session.attributes("bId").asInstanceOf[Integer].toLong)

  52.    bService.bMethod(bParam);

  53.  }


  54.  def fXxx(session: Session): Object = {

  55.    ......

  56.  }

  57. }


randomSwitch 的作用:
以上示例其实是 gatling-dubbo 在有赞的一个典型使用场景,即评估一个应用的单实例性能。按生产环境真实的接口调用比例请求各个接口(该比例由场景执行各个请求的概率分布模拟),这样的压测结果就可以真实反映生产环境应用的单实例性能,并为容量报警、生产扩容等提供参考依据。


2.2 压测数据示例

[  {    "aId": 160,    "bId": 859296  },  {    "aId": 160,    "bId": 1019040  },  {    "aId": 160,    "bId": 1221792  },  ......]


压测数据使用 Json 数组保存,其中每一个 Json 对象都包含了一次压测请求所需的所有动态参数,且为了方便通过 session 设置动态参数,Json 对象中不再嵌套其他 Json 对象。


2.3 压测报告示例

1、应用基线性能评估,用于精准扩容:

image.png


 

2、中心化限流效果验证:

image.png


版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐