- 无论什么请求,先进入网关过滤器。
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 日志
ServerHttpRequest request = exchange.getRequest();
log.info("【GlobalFilter】请求方法:{},请求路径:{},请求URL:{}",
request.getMethod(), request.getPath(), request.getURI());
// 请求的路径不是接口服务,直接放行
if (!request.getPath().toString().contains("/api/interface"))
return chain.filter(exchange);
// 接口调用前进行安全检验
return verifyParameters(exchange, chain);
}
- 此时去调用接口。先走后端
invoke
方法
api-gateway
的 application.yml
定义
spring:
cloud:
gateway:
routes:
- id: api-backend
uri: lb://api-backend
predicates: Path=/api/backend/**
- id: api-interface
uri: lb://api-interface
predicates: Path=/api/interface/**
那么 此时 这个 /api/backend/interfaceInfo/invoke
请求 命中 id = api-backend
的路由。走后端服务
invoke
@PostMapping("/invoke")
@Transactional(rollbackFor = Exception.class)
public BaseResponse<Object> invokeInterface(@RequestBody InvokeRequest invokeRequest, HttpServletRequest request) {
if (ObjectUtils.anyNull(invokeRequest, invokeRequest.getId()) || invokeRequest.getId() <= 0) {
throw new BusinessException(ResponseCode.PARAMS_ERROR);
}
Long id = invokeRequest.getId();
InterfaceInfo interfaceInfo = interfaceInfoService.getById(id);
if (interfaceInfo == null) {
throw new BusinessException(ResponseCode.NOT_FOUND_ERROR);
}
if (interfaceInfo.getStatus() != InterfaceStatusEnum.ONLINE.getValue()) {
throw new BusinessException(ResponseCode.PARAMS_ERROR, "接口未开启");
}
UserVo loginUser = userService.getLoginUser(request);
if (loginUser.getBalance() < interfaceInfo.getReduceScore()) {
throw new BusinessException(50001, "余额不足,请先充值。");
}
// 构建请求参数
List<InvokeRequest.Field> fieldList = invokeRequest.getRequestParams();
String requestParams = "{}";
if (fieldList != null && fieldList.size() > 0) {
JsonObject jsonObject = new JsonObject();
for (InvokeRequest.Field field : fieldList) {
if (StringUtils.isNotBlank(field.getValue()))
jsonObject.addProperty(field.getFieldName(), field.getValue());
}
requestParams = gson.toJson(jsonObject);
}
Map<String, Object> params = new Gson().fromJson(requestParams, new TypeToken<Map<String, Object>>() {
}.getType());
String accessKey = loginUser.getAccessKey();
String secretKey = loginUser.getSecretKey();
try {
ApiClient apiClient = new ApiClient(accessKey, secretKey);
CurrencyRequest currencyRequest = new CurrencyRequest();
currencyRequest.setMethod(interfaceInfo.getMethod());
currencyRequest.setPath(interfaceInfo.getUrl());
currencyRequest.setRequestParams(params);
// 这里发送新的请求,要走网关!
ResultResponse response = apiService.request(apiClient, currencyRequest);
Map<String, Object> data = response.getData();
// 处理网关抛出的异常
if (data.get("code") != null && (double) data.get("code") == 403)
return ResponseUtil.error(OPERATION_ERROR, (String) data.get("message"));
return ResponseUtil.success(data);
} catch (Exception e) {
throw new BusinessException(ResponseCode.SYSTEM_ERROR, e.getMessage());
}
}
apiService.request(apiClient, currencyRequest);
再次走到网关请求过滤器。
- 因为这次请求路径中包含
/api/interface
,所以走到verifyParameters
函数,做接口校验。
接口校验
网关中实际接口调用的流程
@Component
@Slf4j
public class GatewayGlobalFilter implements GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("1. 进入网关过滤器");
return verifyParameters(exchange, chain);
}
private Mono<Void> verifyParameters(ServerWebExchange exchange, GatewayFilterChain chain) {
// ...验证逻辑...
log.info("2. 【GlobalFilter】安全检验通过。");
return handleResponse(exchange, chain, user, interfaceInfo);
}
private Mono<Void> handleResponse(ServerWebExchange exchange, GatewayFilterChain chain, UserVo user, InterfaceInfo interfaceInfo) {
log.info("3. 进入响应处理器");
ServerHttpResponse originalResponse = exchange.getResponse();
DataBufferFactory bufferFactory = originalResponse.bufferFactory();
ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
log.info("5. 接收到实际服务响应");
if (body instanceof Flux) {
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
return super.writeWith(
fluxBody.map(dataBuffer -> {
// 处理响应...
log.info("6. 处理响应数据");
return bufferFactory.wrap(content);
}));
}
return super.writeWith(body);
}
};
log.info("4. 转发请求到实际服务");
// 这里通过chain.filter进行实际的请求转发
return chain.filter(exchange.mutate().response(decoratedResponse).build());
}
}
执行流程追踪
- filter → “1. 进入网关过滤器”
- verifyParameters → “2. 安全检验通过”
- handleResponse → “3. 进入响应处理器”
- chain.filter() → “4. 转发请求到实际服务”
- writeWith → “5. 接收到实际服务响应”
- 响应处理 → “6. 处理响应数据”
- 详细执行流程:
-
chain.filter() → “4. 转发请求到实际服务”
-
writeWith → “5. 接收到实际服务响应”
-
响应处理 → “6. 处理响应数据”
- 下面这段代码使用了装饰器模式。
writeWith
方法在实际服务响应返回后才会被调用 ,使用了响应式编程(Reactive)处理响应流。有点不好理解
if (statusCode == HttpStatus.OK) {
// 装饰,增强能力
ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {
// 等调用完转发的接口后才会执行
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
log.info("5. 接收到实际服务响应");
if (body instanceof Flux) {
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
// 往返回值里写数据
return super.writeWith(
fluxBody.map(dataBuffer -> {
// 扣除积分
redissonLockUtil.redissonDistributedLocks(("gateway_" + user.getUserAccount()).intern(), () -> {
boolean invoke = interfaceInvokeService.invoke(interfaceInfo.getId(), user.getId(), interfaceInfo.getReduceScore());
if (!invoke) {
throw new BusinessException(ResponseCode.OPERATION_ERROR, "接口调用失败");
}
}, "接口调用失败");
byte[] content = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(content);
// 释放掉内存
DataBufferUtils.release(dataBuffer);
String data = new String(content, StandardCharsets.UTF_8);
// 打印日志
log.info("【GlobalFilter】接口调用结果:" + data);
log.info("6. 处理响应数据");
return bufferFactory.wrap(content);
}));
} else {
// 8. 调用失败,返回一个规范的错误码
log.error("【GlobalFilter】接口响应异常:{}", getStatusCode());
}
return super.writeWith(body);
}
};
log.info("4. 转发请求到实际服务");
// 这里通过chain.filter进行实际的请求转发
return chain.filter(exchange.mutate().response(decoratedResponse).build());
}