基于网关的API调用平台的核心流程(学习记录)

  • 无论什么请求,先进入网关过滤器。
	@Override
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		// 日志
		ServerHttpRequest request = exchange.getRequest();
		log.info("【GlobalFilter】请求方法:{},请求路径:{},请求URL:{}",
				request.getMethod(), request.getPath(), request.getURI());

		// 请求的路径不是接口服务,直接放行
		if (!request.getPath().toString().contains("/api/interface"))
			return chain.filter(exchange);

		// 接口调用前进行安全检验
		return verifyParameters(exchange, chain);
	}
  • 此时去调用接口。先走后端 invoke 方法

api-gatewayapplication.yml 定义

spring:
  cloud:
    gateway:
      routes:
        - id: api-backend
          uri: lb://api-backend
          predicates: Path=/api/backend/**
        - id: api-interface
          uri: lb://api-interface
          predicates: Path=/api/interface/**

那么 此时 这个 /api/backend/interfaceInfo/invoke 请求 命中 id = api-backend 的路由。走后端服务

invoke

@PostMapping("/invoke")
	@Transactional(rollbackFor = Exception.class)
	public BaseResponse<Object> invokeInterface(@RequestBody InvokeRequest invokeRequest, HttpServletRequest request) {
		if (ObjectUtils.anyNull(invokeRequest, invokeRequest.getId()) || invokeRequest.getId() <= 0) {
			throw new BusinessException(ResponseCode.PARAMS_ERROR);
		}
		Long id = invokeRequest.getId();
		InterfaceInfo interfaceInfo = interfaceInfoService.getById(id);
		if (interfaceInfo == null) {
			throw new BusinessException(ResponseCode.NOT_FOUND_ERROR);
		}
		if (interfaceInfo.getStatus() != InterfaceStatusEnum.ONLINE.getValue()) {
			throw new BusinessException(ResponseCode.PARAMS_ERROR, "接口未开启");
		}
		UserVo loginUser = userService.getLoginUser(request);
		if (loginUser.getBalance() < interfaceInfo.getReduceScore()) {
			throw new BusinessException(50001, "余额不足,请先充值。");
		}
		// 构建请求参数
		List<InvokeRequest.Field> fieldList = invokeRequest.getRequestParams();
		String requestParams = "{}";
		if (fieldList != null && fieldList.size() > 0) {
			JsonObject jsonObject = new JsonObject();
			for (InvokeRequest.Field field : fieldList) {
				if (StringUtils.isNotBlank(field.getValue()))
					jsonObject.addProperty(field.getFieldName(), field.getValue());
			}
			requestParams = gson.toJson(jsonObject);
		}
		Map<String, Object> params = new Gson().fromJson(requestParams, new TypeToken<Map<String, Object>>() {
		}.getType());
		String accessKey = loginUser.getAccessKey();
		String secretKey = loginUser.getSecretKey();
		try {
			ApiClient apiClient = new ApiClient(accessKey, secretKey);
			CurrencyRequest currencyRequest = new CurrencyRequest();
			currencyRequest.setMethod(interfaceInfo.getMethod());
			currencyRequest.setPath(interfaceInfo.getUrl());
			currencyRequest.setRequestParams(params);
            // 这里发送新的请求,要走网关!
			ResultResponse response = apiService.request(apiClient, currencyRequest);

			Map<String, Object> data = response.getData();

			// 处理网关抛出的异常
			if (data.get("code") != null && (double) data.get("code") == 403)
				return ResponseUtil.error(OPERATION_ERROR, (String) data.get("message"));

			return ResponseUtil.success(data);
		} catch (Exception e) {
			throw new BusinessException(ResponseCode.SYSTEM_ERROR, e.getMessage());
		}
	}
  • apiService.request(apiClient, currencyRequest); 再次走到网关请求过滤器。

  • 因为这次请求路径中包含 /api/interface ,所以走到verifyParameters 函数,做接口校验。

接口校验

网关中实际接口调用的流程


@Component
@Slf4j
public class GatewayGlobalFilter implements GlobalFilter {
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        log.info("1. 进入网关过滤器");
        return verifyParameters(exchange, chain);
    }

    private Mono<Void> verifyParameters(ServerWebExchange exchange, GatewayFilterChain chain) {
        // ...验证逻辑...
        log.info("2. 【GlobalFilter】安全检验通过。");
        return handleResponse(exchange, chain, user, interfaceInfo);
    }

    private Mono<Void> handleResponse(ServerWebExchange exchange, GatewayFilterChain chain, UserVo user, InterfaceInfo interfaceInfo) {
        log.info("3. 进入响应处理器");
        ServerHttpResponse originalResponse = exchange.getResponse();
        DataBufferFactory bufferFactory = originalResponse.bufferFactory();
        
        ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {
            @Override
            public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                log.info("5. 接收到实际服务响应");
                if (body instanceof Flux) {
                    Flux<? extends DataBuffer> fluxBody = Flux.from(body);
                    return super.writeWith(
                            fluxBody.map(dataBuffer -> {
                                // 处理响应...
                                log.info("6. 处理响应数据");
                                return bufferFactory.wrap(content);
                            }));
                }
                return super.writeWith(body);
            }
        };
        
        log.info("4. 转发请求到实际服务");
        // 这里通过chain.filter进行实际的请求转发
        return chain.filter(exchange.mutate().response(decoratedResponse).build());
    }
}

执行流程追踪

  1. filter → “1. 进入网关过滤器”
  2. verifyParameters → “2. 安全检验通过”
  3. handleResponse → “3. 进入响应处理器”
  4. chain.filter() → “4. 转发请求到实际服务”
  5. writeWith → “5. 接收到实际服务响应”
  6. 响应处理 → “6. 处理响应数据”

  1. chain.filter() → “4. 转发请求到实际服务”

  2. writeWith → “5. 接收到实际服务响应”

  3. 响应处理 → “6. 处理响应数据”

  • 下面这段代码使用了装饰器模式。writeWith 方法在实际服务响应返回后才会被调用 ,使用了响应式编程(Reactive)处理响应流。有点不好理解
if (statusCode == HttpStatus.OK) {
			// 装饰,增强能力
			ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {
				// 等调用完转发的接口后才会执行
				@Override
				public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
					log.info("5. 接收到实际服务响应");
					if (body instanceof Flux) {
						Flux<? extends DataBuffer> fluxBody = Flux.from(body);
						// 往返回值里写数据
						return super.writeWith(
								fluxBody.map(dataBuffer -> {
									// 扣除积分
									redissonLockUtil.redissonDistributedLocks(("gateway_" + user.getUserAccount()).intern(), () -> {
										boolean invoke = interfaceInvokeService.invoke(interfaceInfo.getId(), user.getId(), interfaceInfo.getReduceScore());
										if (!invoke) {
											throw new BusinessException(ResponseCode.OPERATION_ERROR, "接口调用失败");
										}
									}, "接口调用失败");
									byte[] content = new byte[dataBuffer.readableByteCount()];
									dataBuffer.read(content);
									// 释放掉内存
									DataBufferUtils.release(dataBuffer);
									String data = new String(content, StandardCharsets.UTF_8);
									// 打印日志
									log.info("【GlobalFilter】接口调用结果:" + data);
									log.info("6. 处理响应数据");
									return bufferFactory.wrap(content);
								}));
					} else {
						// 8. 调用失败,返回一个规范的错误码
						log.error("【GlobalFilter】接口响应异常:{}", getStatusCode());
					}
					return super.writeWith(body);
				}
			};
			log.info("4. 转发请求到实际服务");
			// 这里通过chain.filter进行实际的请求转发
			return chain.filter(exchange.mutate().response(decoratedResponse).build());
		}
3 个赞

太强了,大佬

2 个赞

java spring微服务的那块知识了,有点深,难懂

1 个赞

是的,大佬有那种源码课学习吗?

1 个赞

是自己的一个小项目