使用协程进行简单系统优化

背景

目前的业务场景中,有这样的需求:给上游提供一个批量接口,然后在本系统中,将批量请求拆成单独的请求,每个请求使用一个单独的新线程进行简单包装后,调用下游进行处理,而下游提供的服务由于处理逻辑较多,所以响应速度不快,需要大约大几百毫秒。

目前提供给上游的接口,最多一批处理一百个任务,目前每个任务都会拆成一个单独的线程,也就是上游的一次调用最多会占用100个线程,如果当前系统最大支持nQPS,下游接口的平均RT为m秒,那么光这一个接口,就需要线程池中提供100nm个线程。

例:单机支持500QPS,下游每个接口响应时间为0.1s,那么waybilltemplate在这一个接口上就需要100 * 100 * 0.1 = 1000个线程。

分析

对于下游来说,渲染性能可以不断优化或通过扩容解决流量增长的问题,可以假设平均响应RT不变,此处暂且不考虑。而对于本系统来说,随着流量上涨,在不扩容情况下,单机QPS将会逐步上涨,也就意味着上文计算的线程数将不断上涨。。

线程对于操作系统来说是非常昂贵的资源,对于Java应用来说,Java线程与操作系统线程是一一对应的关系,线程数不断增长主要有两个问题:

  1. 每个线程占用固定大小的内存,线程越多,消耗内存越多。默认情况下,一个线程将分配最多1MB的栈空间
  2. 虽然此处为IO密集型场景,但是线程数过多时,仍会导致CPU资源大量浪费在线程切换过程中。由于大量线程堆积势必会导致CPU load快速增长,直至影响到RT

很多人会认为,如果流量上涨导致CPU不断上升,那扩容就行了,用更多机器来支撑。如果对于耗费CPU资源的接口来说,这样处理是合理的,但是前文也提过,这个系统属于IO密集型,所以CPU使用量应该是可以控制在一个较低的程度才对。如果这样的业务系统流量上涨导致CPU飙高、RT降低,简单通过扩容来解决问题,那从资源上是一种莫大的浪费。

其实这个问题的解决,跟接受网络请求时用NIO解决BIO的缺陷思路一致,BIO需要给每个请求分配一个线程,对于目前动辄单机几千几万QPS的应用来说,分配这么多线程显然是不能的,所以出现NIO这种多路复用的思路,来降低单机的线程数。

目前用多线程请求下游是为了提高接口的吞吐,避免排队请求下游。但是由于网络请求是阻塞式的,线程必须阻塞在RPC接口请求处等待响应,虽然等待期间会切换至其他线程,不消耗CPU资源(不考虑线程切换的开销),但是当前线程是无法被复用的,造成了线程资源的浪费。

那么有没有办法在线程被阻塞的时候,反正闲着也是闲着,把当前线程利用起来,去做其他事情呢?

首先能想到的解决方案就是异步调用,利用Rxjava之类。在发送完RPC请求后,立刻返回,结束当前线程调用。等下游处理完请求并响应时,RPC进行异步回调。但是使用异步代码,往往会把代码可读性写的很差。回调实现的代表语言就是NodeJS回调地狱(Callback Hell)就是众多开发者对其大量回调使用的吐槽。

如果想要使用更简单更直白的方式解决问题,对于golang或Python有了解的同学肯定下意识能想到——协程(也称纤程)。

协程

介绍

关于协程的介绍,引用一段廖雪峰的官方网站中的描述:

协程,又称微线程,纤程。英文名Coroutine。

协程的概念很早就提出来了,但直到最近几年才在某些语言(如Lua)中得到广泛应用。

子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。

所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。

子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。

协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:

1
2
3
4
5
6
7
8
9
def A():
print '1'
print '2'
print '3'

def B():
print 'x'
print 'y'
print 'z'

假设由协程执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是:

1
2
3
4
5
6
1
2
x
y
3
z

但是在A中是没有调用B的,所以协程的调用比函数调用理解起来要难一些。

看起来A、B的执行有点像多线程,但协程的特点在于是一个线程执行,那和多线程比,协程有何优势?

最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。

第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

协程方案

上文提到过,目前主流使用了协程技术语言有Golang、Lua等一类,尤其是GO,这几年可谓是最火的语言之一,其热门的原因之一就是其原生很好地实现了协程,其协程实现命名为Goroutine

但这是其他语言的优势,作为Java开发者,能不能不换语言来拥抱新技术呢?

能。

目前在JVM生态下,比较主流的有如下几种协程实现方案:

  1. 华为JDK
  2. 阿里JDK——dragonwell8 Wisp2
  3. Loom
  4. Kilim
  5. Quasar
  6. Kotlin
  7. Scala

其中1、2两种方案均需要使用特定JDK,华为版协程并没有开源,所以没法了解一下细节。Wisp的细节在文档中有所提及,其在JDK层面对一些JDK中的并发类进行了改造,让开发者可以在不修改代码情况下,将Thead自动转换为Coroutine进行执行。目前在阿里内部,已经有不少应用引入了Wisp,由于需要升级JDK版本,并修改JVM启动参数,并且其会透明影响所有线程(也可以在JVM参数中指定需要使用的类),所以对老项目进行改造时,需要谨慎并进行详尽测试与试跑。

Loom是OpenJDK的官方协程项目,但是离正式商用应该还遥遥无期。

4、5两个项目均为通过字节码增强来实现协程,两者最大的差异是Kilim使用静态字节码增强,使用maven plugin等方式在编译期进行增强;而Quasar使用Java agent进行动态增强,据说约有5%的性能损耗。由于笔者没有使用过这两个库,所以就不多做介绍了,感兴趣的读者可以自行在官方文档中进行了解。

6、7就已经不是Java语言的范畴了,但是把它们列在此处的原因是其仍属于JVM范畴。既然是JVM语言,意味着其仍可以使用Java生态下基本所有的组件,当然包括与Java或其他JVM语言互相调用。

改造

上文中,介绍了协程以及目前比较主流的几种协程使用方式。接下来将会简单描述一下本次优化经历。这在之前,先简述一下本次改造的技术选型与背后的原因。

上文提到的其中方案,剔除没法使用的1和3,两年前就已经没怎么维护的Kilim、原理上复杂度较高且有性能损失的Quasar,就只剩下Wisp、Kotlin、Scala了。最终笔者选择了Kotlin方案,原因是Scala的语法有些过分复杂,做业务开发的实在很少使用;Wisp需要改JDK,动静有点大,所以就都被暂时抛弃了。

Kotlin给Java开发者不光带来了协程,还带来了很多语法糖,解决了Java开发中的很多痛点,例如NPE问题等。而且在老项目中基本可以无痛引入,与老代码几乎完全兼容,所以不光在协程场景下,在其他业务场景也可以提高开发效率。

RPC对协程支持

由于作为改造案例的RPC框架是所在公司内部的框架,所以这里不能直接展示具体代码,很多地方的代码只能用伪代码代替,见谅。

因为使用的RPC框架没有提供协程支持(suspend方法),如果代码一运行到RPC接口调用处,啪,直接阻塞了,那无论是协程还是线程,都解决不了问题。所以需要先对RPC做简单改造。

还好,目前使用的RPC框架虽然没有提供协程支持,但是其支持异步化调用,那稍稍进行改造,就可以将其变为suspend方法。目前市面上大多数远程服务调用都能直接或间接进行异步操作,所以并不受限于某个特殊的RPC框架。

HSF文档中需要将接口改造为异步,需要注册一个新的异步接口,可以详见文档,但此处用了另一种方式,也是上文提到过的FutureInvocationHelper工具类。

FutureInvocationHelper提供了futureCall方法,其入参为实际的HSF方法引用,返回值为一个Future,调用其addListener可以注册一个回调方法,至此就实现了异步化调用。

找到了HSF的异步化支持以后,就要将其改造为供协程使用的suspend方法,即将异步转化为suspend。

此处需要借助Kotlin提供的kotlinx.coroutines.channels.Channel类,这个类有点类似与容量只有1的BlockingQueue,但其消费方法(receive)并不是一个阻塞方法,而是一个suspend方法。再向HSF注册的一个回调监听器,待服务端数据返回时将数据传入Channel中即可。

简单改造,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
abstract class RpcAsyncHelper {

abstract suspend fun <R> invoke(action: () -> R): R?

companion object Default : RpcAsyncHelper() {

override suspend fun <R> invoke(action: () -> R): R? {
val channel = Channel<R>()
// FutureInvocationHelper.futureCall是目前使用RPC框架的执行方法的包装工具类,返回结果类似与java.util.concurrent.Future
val future = FutureInvocationHelper.futureCall(action)
// future.addListener 用来注册RPC执行成功后的监听器
future.addListener {
// 通过future.get 可以获得RPC调用的结果
GlobalScope.launch { channel.send(future.get()) }
}
return channel.receive()
}
}

}

业务改造

为了方便调用,先增加了一个包装类,将原来需要调用的RPC方法包装一下方便业务中使用,代码如下:

1
2
3
4
5
6
7
8
9
10
11
@Component
class AServiceAsyncWrapper {

@Autowired
val aService: AService? = null

suspend fun invoke(params: List<Parameter>): List<String> {
return RpcAsyncHelper.invoke { aService!!.invoke(params) } ?: listOf()
}

}

其中AService为RPC接口,直接通过Spring注入到包装类中,而invoke方法中包装了原来对RPC接口的调用。

在业务代码中,就可以直接调用了,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fun batchInvoke(parameterList: List<Parameter>?): List<String> {
val wrapper: AServiceAsyncWrapper = AServiceAsyncWrapper()
val deferredList = mutableListOf<Deferred<List<String>>>()
val resultList = mutableListOf<String>()
runBlocking(Dispatchers.IO) {
coroutineScope {
for (data in parameterList!!) {
deferredList.add(async { wrapper.invoke(listOf(data)) })
}

for (deferred in deferredList) {
resultList.addAll(deferred.await())
}
}
}
return resultList
}

总结

通过上述代码,可以简单将原本同步/异步调用的RPC接口改造成协程,并且在使用时没有写异步代码时各种回调的困扰,通过同步写法实现了与异步回调几乎一致的性能。

通过最终的实测效果,当系统处于低水位的情况时,如果一个系统内存在大量IO密集型的多线程任务,使用协程可以降低系统的CPU使用。由于环境限制导致笔者暂时没有模拟高水位情况下的效果,也就是目前线程的频繁切换并不是系统的瓶颈,所以接口的RT并没有明显优化效果。

不过需要注意,对于计算密集型的任务,由于CPU计算资源是系统的短板,所以协程并不能明显提升运行效率(多线程也不能,因为频繁切换线程仍会是无用功)。

如果这篇文章对你有帮助,可以请作者喝杯咖啡~