如何使用一个Java对象作为同步锁

本文为翻译稿,翻译自How to Synchronize Blocks by the Value of the Object in Java

以下为翻译正文,译者英文水平有限,有些复杂语句没有直译,而是翻译成了自己理解的内容。

问题

有时候,我们需要通过一个变量来同步一个代码块。

为了理解遇到的问题,我们假设有一个银行业务系统,系统中的转账业务在每次客户端操作时都需要做如下的操作:

  1. 通过一个外部系统的服务(CashBackService)计算转账时的返现(cash back)金额
  2. 在数据库中执行转账操作(AccountService)
  3. 在返现计算系统中更新数据

这个现金转账操作流程大概如下:

1
2
3
4
5
6
7
public void withdrawMoney(UUID userId, int amountOfMoney) {
synchronized (userId) {
Result result = externalCashBackService.evaluateCashBack(userId, amountOfMoney);
accountService.transfer(userId, amountOfMoney + result.getCashBackAmount());
externalCashBackService.cashBackComplete(userId, result.getCashBackAmount());
}
}

这个系统中的基本组件大概如下图所示:

component diagram of the application

我尽可能把这个例子简单化。这个现金转账操作在支付服务中依赖其他两个服务:

  • 第一个依赖的服务是CashBackService,这个服务跟一个外部的web系统通过REST协议进行。此外,为了计算真实的返现额,我们需要在这个应用中同步事务。这么做的原因是用户的下一笔返现金额依赖于用户账户总额。译者注:下一笔返现金额依赖与上一笔返现后的余额,所以两次返现需要进行同步操作
  • 第二个依赖的服务是AccountService,这个服务与内部的数据库和与用户账户相关的存储数据进行通讯。在这个服务中,我们可以使用JPA事务来保证一些操作的原子性。

在现实场景中,我极度建议在有条件的情况下,通过重构来避免这样的场景。但是在这个例子中,假设我们没有其他更好的选择。

我们看一下这个应用的大致代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Service
public class PaymentService {
@Autowired
private ExternalCashBackService externalCashBackService;
@Autowired
private AccountService accountService;
public void withdrawMoney(UUID userId, int amountOfMoney) {
synchronized (userId) {
Result result = externalCashBackService.evaluateCashBack(userId, amountOfMoney);
accountService.transfer(userId, amountOfMoney + result.getCashBackAmount());
externalCashBackService.cashBackComplete(userId, result.getCashBackAmount());
}
}
}
@Service
public class ExternalCashBackService {
@Autowired
private RestTemplate restTemplate;
public Result evaluateCashBack(UUID userId, int amountOfMoney) {
return sendRestRequest("evaluate", userId, amountOfMoney);
}
public Result cashBackComplete(UUID userId, int cashBackAmount) {
return sendRestRequest("complete", userId, cashBackAmount);
}
private Result sendRestRequest(String action, UUID userId, int value) {
URI externalCashBackSystemUrl =
URI.create("http://cash-back-system.org/api/" + action);
HttpHeaders headers = new HttpHeaders();
headers.set("Accept", MediaType.APPLICATION_JSON_VALUE);
RequestDto requestDto = new RequestDto(userId, value);
HttpEntity<?> request = new HttpEntity<>(requestDto, headers);
ResponseDto responseDto = restTemplate.exchange(externalCashBackSystemUrl,
HttpMethod.GET,
request,
ResponseDto.class)
.getBody();
return new Result(responseDto.getStatus(), responseDto.getValue());
}
}
@Service
public class AccountService {
@Autowired
private AccountRepository accountRepository;
@Transactional(isolation = REPEATABLE_READ)
public void transfer(UUID userId, int amountOfMoney) {
Account account = accountRepository.getOne(userId);
account.setBalance(account.getBalance() - amountOfMoney);
accountRepository.save(account);
}
}

现实情况是,你将会有多个有相同值的对象(在这个例子中是userId),但是同步器依赖的是实例对象而不是对象的具体值。译者注:包含相同字面值的对象不一定是同一个对象

下面的代码不能正常工作,因为没有进行正确的同步操作;UUID.fromString(..)这个静态工厂方法在每次被调用的时候创建了新的UUID类型的实例,即使你传入的是相同的字符串参数。

所以我们获得了多个不一样的UUID实例,但是他们的值是相同的。如果我们在多线程下运行这段代码,我们将有很大的可能性遇到同步问题:

1
2
3
4
5
6
public void threadA() {
paymentService.withdrawMoney(UUID.fromString("ea051187-bb4b-4b07-9150-700000000000"), 1000);
}
public void threadB() {
paymentService.withdrawMoney(UUID.fromString("ea051187-bb4b-4b07-9150-700000000000"), 5000);
}

在这样的情况下,你应该获得多个同样的实例来进行同步操作。

错误的解决方法

同步整个方法

你可以在方法上添加synchronized关键词:

1
2
3
public synchronized void withdrawMoney(UUID userId, int amountOfMoney) {
..
}

这个解决方法性能很差。你将锁住所有用户的现金转账操作。而且如果你需要在不同的类中同步不同的操作,这个方案将毫无作用。

String Intern

为了确保我们能够在多个同步锁中获得相同的持有用户ID的实例,我们可以把它序列化到String对象中,并通过调用String.intern()来获得相同的对象引用。

String.intern()使用了全局的对象池来存储字符串对象。当你调用这个方法时,如果对象池中包含这个字符串译者注:这里的字符串指的是字符串的字面量,而不是这个对象,那么将得到池中对象的引用;如果池中不包含这个字符串,那么这个字符串将被加入到对象池中。

你可以在The Java Language Specification - 3.10.5 String Literals或者Oracle的Java文档关于String.intern章节中查看更多关于String.intern()的细节。

1
2
3
4
5
public void withdrawMoney(UUID userId, int amountOfMoney) {
synchronized (userId.toString().intern()) {
..
}
}

使用intern方法不是一个特别好的方法,因为字符串池在GC时很难清理。而且使用String.intern()后你的应用程序将会产生太多活跃资源。

同样的,在你的系统中,一些外部代码进行同步操作的时候使用了跟相同的字符串实例,这可能会导致死锁。

一般情况下,尽可能把使用intern的机会留给JDK库中的代码;有一篇Aleksey Shipilev的优秀文章中提到了这一点

我们该如何正确解决这个问题?

创建你自己的同步原语

我们需要实现下图描述的行为:

diag 0672834a7737bb323990aabe3bcb5ce6

首先,我们需要创建一个新的同步原语——定制的互斥锁。这个锁将依赖于变量的值,而不是对象的引用。

这就有点像一个『姓名锁』,但比『姓名锁』的定义更宽泛。这个锁可以给任何对象进行身份鉴定,而不仅仅是用在字符串上。你可以在其他编程语言(例如C++、C#)中找到同步原语进行锁操作的例子。现在,我们将用它解决Java中的问题。

这个解决方法大概看起来会是这样的:

1
2
3
4
5
public void withdrawMoney(UUID userId, int amountOfMoney) {
synchronized (XMutex.of(userId)) {
..
}
}

为了确保相同的变量字面值可以获得同一个互斥锁,我们创建了锁工厂。

1
2
3
4
5
6
7
8
9
10
public void withdrawMoney(UUID userId, int amountOfMoney) {
synchronized (XMutexFactory.get(userId)) {
..
}
}
public void purchase(UUID userId, int amountOfMoney, VendorDescription vendor) {
synchronized (XMutexFactory.get(userId)) {
..
}
}

为了给每一个相等的值返回同样的锁实例,我们需要把创建的互斥锁存起来。如果我们只是通过普通的HashMap存储,那么map的容量将会在每一次新的值出现的时候增加。我们没有什么工具能判断一个锁是否再也不会被使用译者注:每当有新的值出现时,都要创建一个新的记录,但是没有办法删除,因为不知道什么时候这个值还会不会再被使用。

在这种情况下,当需要使用的时候,我们可以用WeakReference来保存锁的引用。为了实现这样的操作,可以使用WeakHashMap这个数据结构。我在几个月前写过一篇关于这个类型的引用的文章,你可以在这里看到更多的细节:Soft, Weak, Phantom References in Java

我们的锁工厂将基于WeakHashMap。当这个锁用value(key)计算的值在HashMap里找不到时,这个锁工厂将创建一个新的锁。然后再把新创建的锁添加到HashMap中。使用WeakHashMap允许我们往HashMap里存储任何已存在的锁引用,并且当这个锁引用被释放时,将会自动从HashMap中移除。

我们需要使用同步版的WeakHashMap,可以看文档中是怎么描述的:

This class is not synchronized. A synchronized WeakHashMap may be constructed
using the Collections.synchronizedMap method.

这个类不是同步的,一个同步版的WeakHashMap可以使用Collections.synchronizedMap方法创建。

看起来很不幸(WeakHashMap不是同步的),但是我们再仔细看看这个描述,也许我们可用考虑通过官方文档提供的建议(我的意思是使用Collections.synchronizedMap)来写一个实现的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public final Map<XMutex<KeyT>, WeakReference<XMutex<KeyT>>> weakHashMap =
Collections.synchronizedMap(new WeakHashMap<XMutex<KeyT>,
WeakReference<XMutex<KeyT>>>());
public XMutex<KeyT> getMutex(KeyT key) {
validateKey(key);
return getExist(key)
.orElseGet(() -> saveNewReference(key));
}
private Optional<XMutex<KeyT>> getExist(KeyT key) {
return Optional.ofNullable(weakHashMap.get(XMutex.of(key)))
.map(WeakReference::get);
}
private XMutex<KeyT> saveNewReference(KeyT key) {
XMutex<KeyT> mutex = XMutex.of(key);
WeakReference<XMutex<KeyT>> res = weakHashMap.put(mutex, new WeakReference<>(mutex));
if (res != null && res.get() != null) {
return res.get();
}
return mutex;
}

性能怎么样?

当我们翻看一下Collections.synchronizedMap的源码,可以发现很多使用了全局锁来实现与创建SynchronizedMap实例相关的同步操作。

1
2
3
4
SynchronizedMap(Map<K,V> m) {
this.m = Objects.requireNonNull(m);
mutex = this;
}

而且所有的SynchronizedMap中的其他方法都使用了锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public int size() {
synchronized (mutex) {return m.size();}
}
public boolean containsKey(Object key) {
synchronized (mutex) {return m.containsKey(key);}
}
public V get(Object key) {
synchronized (mutex) {return m.get(key);}
}
public V put(K key, V value) {
synchronized (mutex) {return m.put(key, value);}
}
public V remove(Object key) {
synchronized (mutex) {return m.remove(key);}
}
...

这个解决方案并没有特别好的性能表现。所有的这些同步操作将会导致我们在对锁工厂的所有操作都会被上锁。

用WeakReference作为ConcurrentHashMap的Key

我们需要考虑使用ConcurrentHashMap。这比使用Collections.synchronizedMap有更好的性能。但是我们有一个问题——ConcurrentHashMap不允许使用弱引用(weak references),这意味着垃圾收集器不能清理没有的锁。

我发现有两种方式解决这个问题:

  • 第一种是创建我们自己的ConcurrentMap实现。这是一个正确的做法,但是它将花费很长时间.
  • 第二种方式是使用ConcurrentReferenceHashMap,它在Spring框架中被实现了。这是一个很棒的实现类,但是它有一些,下面会提到。

我们用ConcurrentReferenceHashMap改造一下XMutexFactory的实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class XMutexFactory<KeyT> {
/**
* Create mutex factory with default settings
*/
public XMutexFactory() {
this.map = new ConcurrentReferenceHashMap<>(DEFAULT_INITIAL_CAPACITY,
DEFAULT_LOAD_FACTOR,
DEFAULT_CONCURRENCY_LEVEL,
DEFAULT_REFERENCE_TYPE);
}
/**
* Creates and returns a mutex by the key.
* If the mutex for this key already exists in the weak-map,
* then returns the same reference of the mutex.
*/
public XMutex<KeyT> getMutex(KeyT key) {
return this.map.compute(key, (k, v) -> (v == null) ? new XMutex<>(k) : v);
}
}

太酷了!很少的代码,但是性能比之前更好。我们来测试一下这种方案的性能。

创建一个简单的基准测试

我创建了一个小的基准测试来挑选一个实现方式。

在测试中有三种Map的实现方式:

  • Collections.synchronizedMap方式基于WeakHashMap
  • ConcurrentHashMap
  • ConcurrentReferenceHashMap

我在基准测试中使用ConcurrentHashMap是为了将其作为测试基准。这种实现不适合使用在我们的锁工厂中,因为它不支持使用虚引用或软引用。

所有的基准测试是通过JMH库输出的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Run complete. Total time: 00:04:39
Benchmark Mode Cnt Score Error Units
ConcurrentMap.ConcurrentHashMap thrpt 5 0,015 ? 0,004 ops/ns
ConcurrentMap.ConcurrentReferenceHashMap thrpt 5 0,008 ? 0,001 ops/ns
ConcurrentMap.SynchronizedMap thrpt 5 0,005 ? 0,001 ops/ns
ConcurrentMap.ConcurrentHashMap avgt 5 565,515 ? 23,638 ns/op
ConcurrentMap.ConcurrentReferenceHashMap avgt 5 1098,939 ? 28,828 ns/op
ConcurrentMap.SynchronizedMap avgt 5 1503,593 ? 150,552 ns/op
ConcurrentMap.ConcurrentHashMap sample 301796 663,330 ? 11,708 ns/op
ConcurrentMap.ConcurrentReferenceHashMap sample 180062 1110,882 ? 6,928 ns/op
ConcurrentMap.SynchronizedMap sample 136290 1465,543 ? 5,150 ns/op
ConcurrentMap.ConcurrentHashMap ss 5 336419,150 ? 617549,053 ns/op
ConcurrentMap.ConcurrentReferenceHashMap ss 5 922844,750 ? 468380,489 ns/op
ConcurrentMap.SynchronizedMap ss 5 1199159,700 ? 4339391,394 ns/op

在这个小的基准测试中,我创建了多个线程使用这个map的场景,你可以访问Concurrent Map benchmark查看更多细节。

放一张结果图:

benchmark result

所以,ConcurrentReferenceHashMap被证明在这个场景下使用是合理的。

开始使用XSync库

我将代码打包到了 XSync中,你可以直接使用这个功能在需要通过使用字面量进行同步操作的场景中。

为了使用它,你可以添加下面的依赖:

1
2
3
4
5
<dependency>
<groupId>com.antkorwin</groupId>
<artifactId>xsync</artifactId>
<version>1.1</version>
</dependency>

然后,你可以创建一个XSync实例在你需要使用同步器的时候,在Spring框架中,你可以这样创建一个Bean

1
2
3
4
@Bean
public XSync<UUID> xSync(){
return new XSync<>();
}

现在,你可以使用它了:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Autowired
private XSync<UUID> xSync;
public void withdrawMoney(UUID userId, int amountOfMoney) {
xSync.execute(userId, () -> {
Result result = externalPolicySystem.validateTransfer(userId, amountOfMoney, WITHDRAW);
accountService.transfer(userId, amountOfMoney, WITHDRAW);
});
}
public void purchase(UUID userId, int amountOfMoney, VendorDescription vendor) {
xSync.execute(userId, () -> {
..
});
}

并发测试

为了确保代码运行正常,我写了一些并发测试用例。

这是其中一个测试用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void testSyncBySingleKeyInConcurrency() {
// Arrange
XSync<UUID> xsync = new XSync<>();
String id = UUID.randomUUID().toString();
NonAtomicInt var = new NonAtomicInt(0);
// There is a magic here:
// we created a parallel stream and try to increment
// the same nonatomic integer variable in each stream
IntStream.range(0, THREAD_CNT)
.boxed()
.parallel()
.forEach(j -> xsync.execute(UUID.fromString(id), var::increment));
// Asserts
await().atMost(5, TimeUnit.SECONDS)
.until(var::getValue, equalTo(THREAD_CNT));
Assertions.assertThat(var.getValue()).isEqualTo(THREAD_CNT);
}
/**
* Implementation of the does not thread safe integer variable:
*/
@Getter
@AllArgsConstructor
private class NonAtomicInt {
private int value;
public int increment() {
return value++;
}
}

让我们看一下测试结果:

concurrent test result

参考

GitHub中的XSync库:https://github.com/antkorwin/xsync

XSync的使用案例:https://github.com/antkorwin/xsync-example

原文在这里

如果这篇文章对你有帮助,可以请作者喝杯咖啡~