全国服务热线:18271592020
资讯

关于长亭空间

超值服务提供卓越产品

   
  
  
新闻公告 News
   
【技术革命】JDK21虚拟线程来袭,让系统的吞吐量翻倍!
来源: | 作者:chang | 发布时间: 2023-11-03 | 166 | 分享到:

1. 虚拟线程简介

虚拟线程是一种轻量级线程,可大大减少编写、维护和观察高吞吐量并发应用程序的工作量。从JDK19开始发布了虚拟线程的预览功能,直到JDK21终确定虚拟线程。

虚拟线程既廉价(相比平台线程)又可以创建非常的多,因此绝不应池化:每个应用任务都应创建一个新的虚拟线程。因此,大多数虚拟线程的寿命都很短,调用堆栈也很浅,只需执行一次 HTTP 客户端调用或一次 JDBC 查询。相比之下,平台线程重量级、成本高,因此通常必须池化。这些线程的寿命往往较长,具有较深的调用堆栈,可在多个任务之间共享。

总之,虚拟线程保留了可靠的每请求线程风格,这种风格与 Java 平台的设计相协调,同时还能优化利用可用硬件。使用虚拟线程不需要学习新的概念,但可能需要放弃为应对当前线程的高成本而养成的习惯。虚拟线程不仅能帮助应用程序开发人员,还能帮助框架设计人员提供易于使用的 API,这些 API 与平台设计兼容,同时又不影响可扩展性。

虚拟线程是 java.lang.Thread 的一个实例,它在底层操作系统线程上运行 Java 代码,但在代码的整个生命周期中不会捕获操作系统线程。这意味着许多虚拟线程可以在同一个操作系统线程上运行 Java 代码,从而有效地共享操作系统线程。平台线程会垄断宝贵的操作系统线程,而虚拟线程不会。虚拟线程的数量可能远远大于操作系统线程的数量。

虚拟线程是线程的一种轻量级实现,由 JDK 而不是操作系统提供。它们是用户模式线程的一种形式,在其他多线程语言(如 Go 中的 goroutines(协程(轻量级线程)) 和 Erlang 中的进程)中取得了成功。用户模式线程在 Java 早期版本中甚至被称为 "绿色线程",当时操作系统线程尚未成熟和普及。然而,Java 的绿色线程都共享一个操作系统线程(M:1 调度),终被作为操作系统线程包装器(1:1 调度)实现的平台线程所超越。虚拟线程采用 M:N 调度,即大量(M)虚拟线程被安排运行在较少数量(N)的操作系统线程上。

虚拟线程是 java.lang.Thread 的一个实例,与特定操作系统线程无关。相比之下,平台线程是以传统方式实现的 java.lang.Thread 实例,是操作系统线程的薄包装。

2. 传统请求线程模型

通常服务器应用程序处理相互独立的并发请求时,在请求的整个持续声明周期内为该请求指定一个线程来处理该请求。这种按请求线程的风格易于理解、易于编程、易于调试和配置。

对于一个请求处理的处理时间,应用程序同时处理的请求数(即并发数)必须与吞吐量成比例增长。例如,假设一个平均延迟为 50 毫秒的请求并发处理 10 个请求,实现了每秒 200 个请求的吞吐量。若要将该应用的吞吐量提高到到每秒 2000 个请求,则需要并发处理 100 个请求。如果每个请求在请求持续时间内都由一个线程处理,那么要使应用程序跟上进度,线程数必须随着吞吐量的增加而增加。

由于 JDK 将线程作为操作系统(OS)线程的包装器来实现。操作系统线程的成本很高,所以我们不能拥有太多的线程,这就使得线程的实现不适合按请求执行的方式。如果每个请求在其生命周期内都要使用一个线程,也就是一个操作系统线程,那么在 CPU 或网络连接等其他资源耗尽之前,线程数量往往就已经成为限制因素了。JDK 当前的线程实现将应用程序的吞吐量限制在远低于硬件支持的水平。即使对线程进行了池化,也会出现这种情况,因为池化有助于避免启动新线程的高昂成本,但不会增加线程总数。

3. 虚拟线程使用

使用方式1:

复制
// 创建一个执行器,为每个任务启动一个新的虚拟线程 try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    IntStream.range(0, 10_000).forEach(i -> {
        executor.submit(() -> {
            Thread.sleep(Duration.ofSeconds(1)); return i; }); }); }
				
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.

本例中的任务是简单的代码--休眠1秒--现代硬件可以轻松支持 10,000 个虚拟线程同时运行此类代码。而实际上,JDK 只在少量操作系统线程(可能只有一个)上运行此代码代码。

如果该程序使用 ExecutorService(例如 Executors.newCachedThreadPool())为每个任务创建一个新的平台线程,情况就会截然不同。ExecutorService 会尝试创建 10,000 个平台线程,从而创建 10,000 个操作系统线程,根据机器和操作系统的不同,程序可能会崩溃。

即便使用Executors.newFixedThreadPool(200)创建固定数量的线程,情况也不会好到哪里去。ExecutorService 将创建 200 个平台线程,供所有 10,000 个任务共享,因此许多任务将顺序运行而非并发运行,程序将需要很长时间才能完成。对于该程序而言,拥有 200 个平台线程的池每秒只能完成 200 个任务,而虚拟线程每秒可完成约 10,000 个任务(经过充分预热后)。此外,如果将示例程序中的 10_000 改为 1_000_000,那么程序将提交 1,000,000 个任务,创建 1,000,000 个虚拟线程并发运行,(充分预热后)吞吐量将达到每秒约 1,000,000 个任务。

注意:如果程序中的任务在一秒钟内执行计算(例如对一个巨大的数组进行排序),而不仅仅是休眠,那么增加线程数超过处理器内核数将无济于事,无论它们是虚拟线程还是平台线程。虚拟线程不是更快的线程--它们运行代码的速度并不比平台线程快。它们的存在是为了提供规模(更高的吞吐量),而不是速度(更低的延迟)。虚拟线程的数量可能比平台线程多得多,因此根据利特尔定律,虚拟线程可以提供更高吞吐量所需的更高并发性。

使用方式2:

手动创建虚拟线程

复制
// 创建虚拟线程 OfVirtual virtual = Thread.ofVirtual().name("pack") ; virtual.start(() -> {
  System.out.printf("%s - 任务执行完成", Thread.currentThread().getName()) ; }) ; // 创建不自动启动的线程 Thread thread = virtual.unstarted(() -> {
  System.out.printf("%s - 任务执行完成", Thread.currentThread().getName()) ; }) ; // 手动启动虚拟线程 thread.start() ; // 打印线程对象:VirtualThread[#21,pack]/runnable System.out.println(thread) ; // 创建普通线程 OfPlatform platform = Thread.ofPlatform().name("pack") ; Thread thread = platform.start(() -> {
  System.out.printf("%s - 任务执行完成", Thread.currentThread().getName()) ; }) ; // 这里输出:Thread[#21,pack,5,main] System.out.println(thread) ;
				
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.

在上面的代码中,打印thread输出的不是对应的平台线程,而是虚拟线程

复制
VirtualThread[#21,pack]/runnable
				
  • 1.

在执行的任务中通过Thread.currentThread().getName()方法是没有任何信息,我们可以通过上面的name()方法来设置线程的名称及相关的前缀。如下:

复制
Thread.ofPlatform().name("pack") ; Thread.ofVirtual().name("pack", 0) ;
				
  • 1.
  • 2.

使用方式3:

通过ThreadFactory工厂创建

复制
ThreadFactory threadFactory = Thread.ofVirtual().factory() ; threadFactory.newThread(() -> {
  System.out.printf("%s - 任务执行完成", Thread.currentThread().getName()) ; }).start() ;
				
  • 1.
  • 2.
  • 3.
  • 4.

使用方式4:

直接通过Thread静态方法

复制
Thread.startVirtualThread(() -> {
  System.out.printf("%s - 任务执行完成", Thread.currentThread().getName()) ; }) ;
				
  • 1.
  • 2.
  • 3.

4. 虚拟线程与传统线程池对比

使用虚拟线程

复制
public class Demo06 {


  static class Task implements Runnable { @Override public void run() {
      System.err.printf("start - %d%n", System.currentTimeMillis()) ; try {
        Thread.sleep(Duration.ofSeconds(1)); } catch (InterruptedException e) {}
      System.err.printf("  end - %d%n", System.currentTimeMillis()) ; }
  } public static void main(String[] args) throws Exception {
    ExecutorService es= Executors.newVirtualThreadPerTaskExecutor() ; es.submit(new Task()) ; es.submit(new Task()) ; es.submit(new Task()) ; System.in.read() ; }


}
				
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.

输出结果:

复制
start - 1698827467289 start - 1698827467289 start - 1698827467291 end - 1698827468317 end - 1698827468317 end - 1698827468317
				
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.

从结果看出,基本是同时开始,结束也是基本一起结束,总耗时1s。

使用传统线程

任务都一样,只是创建线程池的类型修改

复制
public static void main(String[] args) throws Exception {
  ExecutorService es= Executors.newFixedThreadPool(1) ; es.submit(new Task()) ; es.submit(new Task()) ; es.submit(new Task()) ; }
				
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.

输出结果:

复制
start - 1698827686133 end - 1698827687165 start - 1698827687165 end - 1698827688177 start - 1698827688177 end - 1698827689178
				
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.

从结果知道这里是一个任务一个任务的执行串行化,但是你注意观察,其实每个任务的的开始start 的输出都是要等前一个线程执行完了后才能执行。结合上面的虚拟线程对比,start是同时输出的,这也是虚拟线程的有点了。

5. 使用案例

这是一个远程接口调用的示例:

远程3个接口,如下:

复制
@GetMapping("/userinfo") public Object queryUserInfo() {
  try {
    TimeUnit.SECONDS.sleep(2) ; } catch (InterruptedException e) {e.printStackTrace();} return "查询用户信息" ; } @GetMapping("/stock") public Object queryStock() {
  try {
    TimeUnit.SECONDS.sleep(3) ; } catch (InterruptedException e) {e.printStackTrace();} return "查询库存信息" ; } @GetMapping("/order") public Object queryOrder() {
  try {
    TimeUnit.SECONDS.sleep(4) ; } catch (InterruptedException e) {e.printStackTrace();} return "查询订单信息" ; }
				
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.

接口调用服务,如下:

复制
@Resource private RestTemplate restTemplate ; public Map<String, Object> rpc() {


  try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    var start = System.currentTimeMillis() ; // 1.查询用户信息 var userinfo = executor.submit(() -> query("http://localhost:8080/demos/userinfo")); // 2.查询库存信息 var stock = executor.submit(() -> query("http://localhost:8080/demos/stock")); // 3.查询订单信息 var order = executor.submit(() -> query("http://localhost:8080/demos/order")); Map<String, Object> res = Map.of("userinfo", userinfo.get(), "stock", stock.get(), "order", order.get()) ; System.out.printf("总计耗时:%d毫秒%n", (System.currentTimeMillis() - start)) ; return res ; } catch (Exception e) { return Map.of() ; }
}
private Object query(String url) { return this.restTemplate.getForObject(url, String.class) ; }
				
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.

在这个案例中,如果使用传统的线程池,如果并发量大,那么很可能很多的任务都要排队等待,或者你需要创建更多的平台线程来满足吞吐量问题。但是现在有了虚拟线程你可以不用再考虑线程不够用的情况了,每个任务的执行都会被一个虚拟的线程执行(不是平台线程,可能这些虚拟线程只会对应到一个平台线程)。

虚拟线程可在以下情况显著提高应用吞吐量:

  • 并发任务的数量很高(超过几千)
  • 工作负载不受cpu限制,因为在这种情况下,线程比处理器内核多并不能提高吞吐量

6. 结构化并发(预览功能)

结构化并发目前还是预览功能,并没有在JDK21中正式发布,不过我们可以先来看看什么是结构化并发。

结构化并发 API 是来简化并发编程。结构化并发将在不同线程中运行的一组相关任务视为一个工作单元,从而简化了错误处理和取消,提高了可靠性,并增强了可观察性。

结构化并发的目标是:

  • 推广一种并发编程风格,消除因取消和关闭而产生的常见风险,如线程泄漏和取消延迟。
  • 提高并发代码的可观察性。

通过示例来理解结构化并发。

如下示例是通过传统线程池的方式并发的从远程获取信息,代码如下:

复制
static RestTemplate restTemplate = new RestTemplate() ; public static void main(String[] args) throws Exception {
  ExecutorService es = Executors.newFixedThreadPool(2) ; Future<Object> userInfo = es.submit(UnstructuredConcurrentDemo::queryUserInfo) ; Future<Object> stock = es.submit(UnstructuredConcurrentDemo::queryStock) ; Object userInfoRet = userInfo.get() ; System.out.printf("执行结果:用户信息:%s%n", userInfoRet.toString()) ; Object stockRet = stock.get() ; System.out.printf("执行结果:库存信息:%s%n", stockRet.toString()) ; } public static Object queryUserInfo() { return restTemplate.getForObject("http://localhost:8080/demos/userinfo", String.class) ; } public static Object queryStock() { return restTemplate.getForObject("http://localhost:8080/demos/stock", String.class) ; }
				
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.

上面的代码中没有什么问题,程序都能够运行的正常,结果如下:

复制
08:49:53.502 [pool-1-thread-1] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK 08:49:53.504 [pool-1-thread-1] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8" 执行结果:用户信息:查询用户信息 08:49:54.493 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK 08:49:54.493 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8" 执行结果:库存信息:查询库存信息
				
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.

但是如果其中一个任务执行失败了后会如何呢?将其中一个任务抛出异常,如下代码:

复制
public static Object queryStock() {
  System.out.println(1 / 0) ; return restTemplate.getForObject("http://localhost:8080/demos/stock", String.class) ; }
				
  • 1.
  • 2.
  • 3.
  • 4.

再次执行代码,结果如下:

复制
发生异常:java.lang.ArithmeticException: / by zero 09:06:05.938 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- HTTP GET http://localhost:8080/demos/stock 09:06:05.948 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Accept=[text/plain, application/json, application/*+json, */*] 09:06:08.972 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK 09:06:08.974 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8" 执行结果:库存信息:查询库存信息
				
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.

从结果看出,获取用户信息子任务发生异常后,并不会影响到获取库存子任务的执行。

通过结构化并发方式

复制
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
  Supplier<Object> userInfo = scope.fork(UnstructuredConcurrentDemo::queryUserInfo) ; Supplier<Object> stock = scope.fork(UnstructuredConcurrentDemo::queryStock) ; // 等待在此任务范围内启动的所有子任务完成或某个子任务失败。 scope.join() ; Object userInfoRet = userInfo.get() ; System.out.printf("执行结果:用户信息:%s%n", userInfoRet.toString()) ; Object stockRet = stock.get() ; System.out.printf("执行结果:库存信息:%s%n", stockRet.toString()) ; }
				
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.

当一个子任务发生错误时,其它的子任务会在未完成的情况下取消,执行结果如下:

复制
08:59:51.951 [] DEBUG org.springframework.web.client.RestTemplate -- HTTP GET http://localhost:8080/demos/stock 08:59:51.961 [] DEBUG org.springframework.web.client.RestTemplate -- Accept=[text/plain, application/json, application/*+json, */*] Exception in thread "main" java.lang.IllegalStateException: Subtask not completed or did not complete successfully
  at java.base/java.util.concurrent.StructuredTaskScope$SubtaskImpl.get(StructuredTaskScope.java:936) at com.pack.rpc.UnstructuredConcurrentDemo.structured(UnstructuredConcurrentDemo.java:26) at com.pack.rpc.UnstructuredConcurrentDemo.main(UnstructuredConcurrentDemo.java:17)
				
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.

从控制台的输出看出,获取库存的调用被取消了。

完毕!!!