设计实现一个通用的接口幂等框架

需求场景

我们先来看下幂等框架的需求场景。

调用方访问公共服务平台的接口,会有三种可能的结果:成功、失败和超时。前两种结果非常明确,调用方可以自己决定收到结果之后如何处理。结果为“成功”,万事大吉。结果为“失败”,一般情况下,调用方会将失败的结果,反馈给用户(移动端 App),让用户自行决定是否重试。但是,当接口请求超时时,处理起来就没那么容易了。

第一种处理方式是,调用方访问公共服务平台接口超时时,返回清晰明确的提醒给用户,告知执行结果未知,让用户自己判断是否重试。不过,你可能会说,如果用户看到了超时提醒,但还是重新发起了操作,比如重新发起了转账、充值等操作,那该怎么办呢?实际上,对这种情况,技术是无能为力的。因为两次操作都是用户主动发起的,我们无法判断第二次的转账、充值是新的操作,还是基于上一次超时的重试行为。

第二种处理方式是,调用方调用其他接口,来查询超时操作的结果,明确超时操作对应的业务,是执行成功了还是失败了,然后再基于明确的结果做处理。但是这种处理方法存在一个问题,那就是并不是所有的业务操作,都方便查询操作结果。

第三种处理方式是,调用方在遇到接口超时之后,直接发起重试操作。这样就需要接口支持幂等。我们可以选择在业务代码中触发重试,也可以将重试的操作放到 Feign 框架中完成。因为偶尔发生的超时,在正常的业务逻辑中编写一大坨补救代码,这样做会影响到代码的可读性,有点划不来。当然,如果项目中需要支持超时重试的业务不多,那对于仅有几个业务,特殊处理一下也未尝不可。但是,如果项目中需要支持超时重试的业务比较多,我们最好是把超时重试这些非业务相关的逻辑,统一在框架层面解决。

需求分析

功能性需求
刚刚我们介绍了幂等框架的需求背景:超时重试需要接口幂等的支持。接下来,我们再对需求进行更加详细的分析和整理,这其中就包括功能性需求和非功能性需求。

超时重试需要接口幂等的支持。
放到接口调用的这个场景里,幂等的意思是,针对同一个接口,多次发起同一个业务请求,必须保证业务只执行一次。

非功能性需求
在易用性方面,我们希望框架接入简单方便,学习成本低。只需编写简单的配置以及少许代码,就能完成接入。除此之外,框架最好对业务代码低侵入松耦合,在统一的地方(比如 Spring AOP 中)接入幂等框架,而不是将它耦合在业务代码中。

在性能方面,针对每个幂等接口,在正式处理业务逻辑之前,我们都要添加保证幂等的处理逻辑。这或多或少地会增加接口请求的响应时间。而对于响应时间比较敏感的接口服务来说,我们要让幂等框架尽可能低延迟,尽可能减少对接口请求本身响应时间的影响。

在容错性方面,跟限流框架相同,不能因为幂等框架本身的异常,导致接口响应异常,影响服务本身的可用性。所以,幂等框架要有高度的容错性。比如,存储幂等号的外部存储器挂掉了,幂等逻辑无法正常运行,这个时候业务接口也要能正常服务才行。

设计

正常情况下,幂等处理流程是非常简单的,难点在于如何应对异常情况。在这三个阶段中,如果第一个阶段出现异常,比如发送请求失败或者超时,幂等号还没有记录下来,重试请求会被执行,符合我们的预期。如果第三个阶段出现异常,业务逻辑执行完成了,只是在发送结果给调用方的时候,失败或者超时了,这个时候,幂等号已经记录下来,重试请求不会被执行,也符合我们的预期。也就是说,第一、第三阶段出现异常,上述的幂等处理逻辑都可以正确应对。

但是,如果第二个阶段业务执行的过程出现异常,处理起来就复杂多了。接下来,我们就看下幂等框架该如何应对这一阶段的各种异常。我分了三类异常来讲解,它们分别是业务代码异常、业务系统宕机、幂等框架异常。

业务代码异常处理

实际上,为了让幂等框架尽可能的灵活,低侵入业务逻辑,发生异常(不管是业务异常还是系统异常),是否允许再重试执行业务逻辑,交给开发这块业务的工程师来决定是最合适的了,毕竟他最清楚针对每个异常该如何处理。而幂等框架本身不参与这个决定,它只需要提供删除幂等号的接口,由业务工程师来决定遇到异常的时候,是否需要调用这个删除接口,删除已经记录的幂等号。

业务系统宕机处理

我们建议业务系统记录 SQL 的执行日志,在日志中附加上幂等号。这样我们就能在机器宕机时,根据日志来判断业务执行情况和幂等号的记录是否一致。

幂等框架异常处理

对于幂等来说,尽管它应对的也是超时重试等特殊场景,但是,如果本不应该重新执行的业务逻辑,因为幂等功能的暂时失效,被重复执行了,就会导致业务出错(比如,多次执行转账,钱多转了)。对于这种情况,绝大部分业务场景都是无法接受的。所以,在幂等逻辑执行异常时,我们选择让接口请求也失败,相应的业务逻辑就不会被重复执行了。毕竟接口请求失败(比如转钱没转成功),比业务执行出错(比如多转了钱),修复的成本要低很多。

代码实现

主要包含下面这样两个主要的功能开发点:

  • 实现生成幂等号的功能;
  • 实现存储、查询、删除幂等号的功能。

我们先来看,如何生成幂等号。

幂等号用来标识两个接口请求是否是同一个业务请求,换句话说,两个接口请求是否是重试关系,而非独立的两个请求。接口调用方需要在发送接口请求的同时,将幂等号一块传递给接口实现方。那如何来生成幂等号呢?一般有两种生成方式。一种方式是集中生成并且分派给调用方,另一种方式是直接由调用方生成。
对于第一种生成方式,我们需要部署一套幂等号的生成系统,并且提供相应的远程接口(Restful 或者 RPC 接口),调用方通过调用远程接口来获取幂等号。这样做的好处是,对调用方完全隐藏了幂等号的实现细节。当我们需要改动幂等号的生成算法时,调用方不需要改动任何代码。
对于第二种生成方式,调用方按照跟接口实现方预先商量好的算法,自己来生成幂等号。这种实现方式的好处在于,不用像第一种方式那样调用远程接口,所以执行效率更高。但是,一旦需要修改幂等号的生成算法,就需要修改每个调用方的代码。并且,每个调用方自己实现幂等号的生成算法也会有问题。

一方面,重复开发,违反 DRY 原则。另一方面,工程师的开发水平层次不齐,代码难免会有 bug。除此之外,对于复杂的幂等号生成算法,比如依赖外部系统 Redis 等,显然更加适合上一种实现方式,可以避免调用方为了使用幂等号引入新的外部系统。权衡来讲,既考虑到生成幂等号的效率,又考虑到代码维护的成本,我们选择第二种实现方式,并且在此基础上做些改进,由幂等框架来统一提供幂等号生成算法的代码实现,并封装成开发类库,提供给各个调用方复用。

除此之外,我们希望生成幂等号的算法尽可能的简单,不依赖其他外部系统。实际上,对于幂等号的唯一要求就是全局唯一。全局唯一 ID 的生成算法有很多。比如,简单点的有取 UUID,复杂点的可以把应用名拼接在 UUID 上,方便做问题排查。总体上来讲,幂等号的生成算法并不难。

我们再来看,如何实现幂等号的存储、查询和删除
从现在的需求来看,幂等号只是为了判重。在数据库中,我们只需要存储一个幂等号就可以,不需要太复杂的存储结构,所以,我们不选择使用复杂的关系型数据库,而是选择使用更加简单的、读写更加快速的键值数据库,比如 Redis。

在幂等判重逻辑中,我们需要先检查幂等号是否存在。如果没有存在,再将幂等号存储进 Redis。多个线程(同一个业务实例的多个线程)或者多进程(多个业务实例)同时执行刚刚的“检查 - 设置”逻辑时,就会存在竞争关系(竞态,race condition)。比如,A 线程检查幂等号不存在,在 A 线程将幂等号存储进 Redis 之前,B 线程也检查幂等号不存在,这样就会导致业务被重复执行。为了避免这种情况发生,我们要给“检查 - 设置”操作加锁,让同一时间只有一个线程能执行。除此之外,为了避免多进程之间的竞争,普通的线程锁还不起作用,我们需要分布式锁。

引入分布式锁会增加开发的难度和复杂度,而 Redis 本身就提供了把“检查 - 设置”操作作为原子操作执行的命令:setnx(key, value)。它先检查 key 是否存在,如果存在,则返回结果 0;如果不存在,则将 key 值存下来,并将值设置为 value,返回结果 1。因为 Redis 本身是单线程执行命令的,所以不存在刚刚讲到的并发问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// 代码目录结构
com.xzg.cd.idempotence
--Idempotence
--IdempotenceIdGenerator(幂等号生成类)
--IdempotenceStorage(接口:用来读写幂等号)
--RedisClusterIdempotenceStorage(IdempotenceStorage的实现类)

// 每个类的代码实现
public class Idempotence {
private IdempotenceStorage storage;

public Idempotence(IdempotenceStorage storage) {
this.storage = storage;
}

public boolean saveIfAbsent(String idempotenceId) {
return storage.saveIfAbsent(idempotenceId);
}

public void delete(String idempotenceId) {
storage.delete(idempotenceId);
}
}

public class IdempotenceIdGenerator {
public String generateId() {
return UUID.randomUUID().toString();
}
}

public interface IdempotenceStorage {
boolean saveIfAbsent(String idempotenceId);
void delete(String idempotenceId);
}

public class RedisClusterIdempotenceStorage implements IdempotenceStorage {
private JedisCluster jedisCluster;

/**
* Constructor
* @param redisClusterAddress the format is 128.91.12.1:3455;128.91.12.2:3452;289.13.2.12:8978
* @param config should not be null
*/
public RedisIdempotenceStorage(String redisClusterAddress, GenericObjectPoolConfig config) {
Set<HostAndPort> redisNodes = parseHostAndPorts(redisClusterAddress);
this.jedisCluster = new JedisCluster(redisNodes, config);
}

public RedisIdempotenceStorage(JedisCluster jedisCluster) {
this.jedisCluster = jedisCluster;
}

/**
* Save {@idempotenceId} into storage if it does not exist.
* @param idempotenceId the idempotence ID
* @return true if the {@idempotenceId} is saved, otherwise return false
*/
public boolean saveIfAbsent(String idempotenceId) {
Long success = jedisCluster.setnx(idempotenceId, "1");
return success == 1;
}

public void delete(String idempotenceId) {
jedisCluster.del(idempotenceId);
}

@VisibleForTesting
protected Set<HostAndPort> parseHostAndPorts(String redisClusterAddress) {
String[] addressArray= redisClusterAddress.split(";");
Set<HostAndPort> redisNodes = new HashSet<>();
for (String address : addressArray) {
String[] hostAndPort = address.split(":");
redisNodes.add(new HostAndPort(hostAndPort[0], Integer.valueOf(hostAndPort[1])));
}
return redisNodes;
}
}