Spring Cloud Feign OkHttp Hystrix 常见问题&深入源码分析

Spring Cloud Feign OkHttp Hystrix 常见问题&深入源码分析return new okhttp3.OkHttpClient.Builder() * Create a new connecti

大家好,欢迎来到IT知识分享网。

1、Spring Cloud Feign 最佳实践

在项目中,调用服务接口我们推荐使用feign并开启hystix熔断。

由于我们未采取Springcloud微服务的方案,所以就必须要在 @FeignClient 填写 url 这个参数指定地址。

微服务的情况下,需要注意配置ribbon,本文章忽略之

1.1、推荐配置

开启 hystix熔断,关闭feign的重试机制,使用okhttp并关闭okhttp的重试机制,feign的回退方式使用工厂模式。

1) maven

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> 

2) FeignOkHttpConfig

@Configuration @ConditionalOnClass({Feign.class}) @AutoConfigureBefore(FeignAutoConfiguration.class) public class FeignOkHttpConfig { @Value("${feign.okhttp3.connect-timeout.milliseconds:500}") private Long connectTimeout; @Value("${feign.okhttp3.read-timeout.milliseconds:1000}") private Long readTimeout; @Value("${feign.okhttp3.write-timeout.milliseconds:60000}") private Long writeTimeout; @Bean public okhttp3.OkHttpClient okHttpClient() { return new okhttp3.OkHttpClient.Builder() .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) .readTimeout(readTimeout, TimeUnit.MILLISECONDS) .writeTimeout(writeTimeout, TimeUnit.MILLISECONDS) .retryOnConnectionFailure(false) // 关闭重试机制 .connectionPool(new okhttp3.ConnectionPool()) .build(); } // 关闭重试 @Bean Retryer feignRetry() { return Retryer.NEVER_RETRY; } } 

3) 配置文件

#feign feign: httpclient: enabled: false hystrix: enabled: true # hystrix 启用 okhttp: enabled: true okhttp3: connect-timeout: milliseconds: 3000 read-timeout: milliseconds: 3000 write-timeout: milliseconds: 60000 hystrix: command: default: execution: isolation: thread: timeoutInMilliseconds: 3000 bokeccLive: #cc云直播的熔断超时时间为1s execution: isolation: thread: timeoutInMilliseconds: 1000 study: #ep-study的熔断超时时间为1s execution: isolation: thread: timeoutInMilliseconds: 1000 resource: #资源中心的熔断超时时间为3s execution: isolation: thread: timeoutInMilliseconds: 3000 sso: #资源中心的熔断超时时间为3s execution: isolation: thread: timeoutInMilliseconds: 3000 threadpool: default: coreSize: 10 maxQueueSize: 100 bokeccLive: # cc云直播熔断线程池配置,QPS为50 coreSize: 50 maxQueueSize: 100 study: #ep-study熔断线程池配置,QPS为10 coreSize: 10 maxQueueSize: 100 resource: #资源中心熔断线程池配置,QPS为10 coreSize: 30 maxQueueSize: 100 sso: #公共服务熔断线程池配置,QPS为10 coreSize: 50 maxQueueSize: 100 

下面就是我们线上大量系统优化后的生产经验总结:

假设你的服务A,每秒钟会接收30个请求,同时会向服务B发起30个请求,然后每个请求的响应时长经验值大概在200ms,那么你的hystrix线程池需要多少个线程呢?

计算公式是:30(每秒请求数量) * 0.2(每个请求的处理秒数) + 4(给点缓冲buffer) = 10(线程数量)

必须设置合理的参数,避免高峰期,频繁的hystrix线程卡死

如果hystix超时时间设置为500ms,那么1s中可以处理2个线程,所以如果需要让一个服务器达到100的并发,那么核心线程数需要配置到50才能达到处理每秒100的请求;

1.2、推荐实现

feign 的请求使用SpringMvc的注解,并且要求必须有回退且使用工厂模式

1) FeignClient

@FeignClient(name = "sso", url = "${services.ssoService.url}", fallbackFactory = SsoFeignClientFallbackFactory.class) public interface SsoFeignClient { /** * 学生 ID 获取用户信息 * * @param userId 学生 ID * @return 用户信息 */ @GetMapping(value = "/getbaseuserinfo/{userid}", headers = {"origin=gaodun.com"}) BaseUserInfoResponse getBaseUserInfo(@PathVariable("userid") String userId); } 

2) FallbackFactory

@Slf4j @Component public class SsoFeignClientFallbackFactory implements FallbackFactory<SsoFeignClient> { @Override public SsoFeignClient create(Throwable throwable) { return userId -> { log.error("getBaseUserInfo,fallback;reason was:{}", throwable.getMessage(), throwable); return BaseUserInfoResponse.fallbackResult(); }; } } 

2、Spring Cloud Feign 性能优化

2.1 测试代码

feign&接口:

@FeignClient(name = "live", url = "http://127.0.0.1:8080/", fallbackFactory = LiveFeignClientFallbackFactory.class) public interface LiveFeignClient { @GetMapping("/live/okhttp") BusinessResponse<Void> okhttp(); } 
@GetMapping("/okhttp") public BusinessResponse<Void> okhttp() { return BusinessResponse.ok(); } 

代码的运行结果:

13:50:59.620 [http-nio-8080-exec-175] INFO [] com.gaodun.storm.vod.aspect.ControllerLogAspect - LiveController.okhttp请求结束,耗时:0ms 13:50:59.619 [http-nio-8080-exec-138] INFO [] com.gaodun.storm.vod.aspect.ControllerLogAspect - LiveController.feignOkhttp请求结束,耗时:215ms 

以上的代码目的是检验feign的各项配置,由于接口的响应时间几乎忽略不计,那么请求接口的响应时长的表现,就是feign在调用第三方接口时,根据并发的情况和配置情况所需要的自身处理的时长

2.2 结果对比

jmeter 压测

1)默认项目配置测试结果

#使用okhttp,开启熔断 feign: httpclient: enabled: false hystrix: enabled: true okhttp: enabled: true hystrix: command: default: execution: isolation: thread: timeoutInMilliseconds:  circuitBreaker: requestVolumeThreshold: 10000 live: execution: isolation: thread: timeoutInMilliseconds:  circuitBreaker: requestVolumeThreshold: 10000 threadpool: default: coreSize: 100 maxQueueSize: 10000 queueSizeRejectionThreshold: 8000 live: coreSize: 100 maxQueueSize: 10000 queueSizeRejectionThreshold: 8000 

默认压测结果截图:

Spring Cloud Feign OkHttp Hystrix 常见问题&深入源码分析

Spring Cloud Feign OkHttp Hystrix 常见问题&深入源码分析

2)根据经验配置优化以后的结果

hystrix: command: default: execution: isolation: thread: timeoutInMilliseconds:  circuitBreaker: requestVolumeThreshold: 10000 live: execution: isolation: thread: timeoutInMilliseconds: 1000 threadpool: default: coreSize: 100 maxQueueSize: 10000 queueSizeRejectionThreshold: 8000 live: coreSize: 10 maxQueueSize: 100 

优化后的压测结果:

Spring Cloud Feign OkHttp Hystrix 常见问题&深入源码分析

Spring Cloud Feign OkHttp Hystrix 常见问题&深入源码分析

2.3 总结

合理的设置核心线程池的大小,参考推荐配置;(Thread Pools 的 Active 设置的越大,当并发没有那么高的情况下,处理线程的耗时越长)

3、Spring Cloud Feign 实现原理

在微服务中,会使用到负载均衡,Feign集成了ribbon来实现,那么实际上处理 HTTP URL 请求的是 feignClient(…) 方法中的 feign.okhttp.LoadBalancerFeignClient.execute(…) 方法

其中okhttp的版本为 3.8.1,feign集成okhttp的版本如下:

<dependency> <groupId>io.github.openfeign</groupId> <artifactId>feign-okhttp</artifactId> <version>10.1.0</version> </dependency> 

3.1 Feign 执行流程,源码解析(非微服务的情况下)

SynchronousMethodHandler 这里使用默认的, 如果是在微服务的情况下使用feign,那么需要分析的是:FeignLoadBalancer,也就是要理解Ribbon

1)SynchronousMethodHandler 同步方法处理器

请看在源码上的注释

final class SynchronousMethodHandler implements MethodHandler { private static final long MAX_RESPONSE_BUFFER_SIZE = 8192L; private final MethodMetadata metadata; private final Target<?> target; private final Client client; private final Retryer retryer; private final List<RequestInterceptor> requestInterceptors; private final Logger logger; private final Logger.Level logLevel; private final RequestTemplate.Factory buildTemplateFromArgs; private final Options options; private final Decoder decoder; private final ErrorDecoder errorDecoder; private final boolean decode404; private final boolean closeAfterDecode; private final ExceptionPropagationPolicy propagationPolicy; private SynchronousMethodHandler(Target<?> target, Client client, Retryer retryer, List<RequestInterceptor> requestInterceptors, Logger logger, Logger.Level logLevel, MethodMetadata metadata, RequestTemplate.Factory buildTemplateFromArgs, Options options, Decoder decoder, ErrorDecoder errorDecoder, boolean decode404, boolean closeAfterDecode, ExceptionPropagationPolicy propagationPolicy) { this.target = checkNotNull(target, "target"); this.client = checkNotNull(client, "client for %s", target); this.retryer = checkNotNull(retryer, "retryer for %s", target); this.requestInterceptors = checkNotNull(requestInterceptors, "requestInterceptors for %s", target); this.logger = checkNotNull(logger, "logger for %s", target); this.logLevel = checkNotNull(logLevel, "logLevel for %s", target); this.metadata = checkNotNull(metadata, "metadata for %s", target); this.buildTemplateFromArgs = checkNotNull(buildTemplateFromArgs, "metadata for %s", target); this.options = checkNotNull(options, "options for %s", target); this.errorDecoder = checkNotNull(errorDecoder, "errorDecoder for %s", target); this.decoder = checkNotNull(decoder, "decoder for %s", target); this.decode404 = decode404; this.closeAfterDecode = closeAfterDecode; this.propagationPolicy = propagationPolicy; } @Override public Object invoke(Object[] argv) throws Throwable { RequestTemplate template = buildTemplateFromArgs.create(argv); Retryer retryer = this.retryer.clone(); while (true) { try { return executeAndDecode(template); } catch (RetryableException e) { try { retryer.continueOrPropagate(e); } catch (RetryableException th) { Throwable cause = th.getCause(); if (propagationPolicy == UNWRAP && cause != null) { throw cause; } else { throw th; } } if (logLevel != Logger.Level.NONE) { logger.logRetry(metadata.configKey(), logLevel); } continue; } } } Object executeAndDecode(RequestTemplate template) throws Throwable { Request request = targetRequest(template); if (logLevel != Logger.Level.NONE) { logger.logRequest(metadata.configKey(), logLevel, request); } Response response; long start = System.nanoTime(); try { // 使用 OkHttpClient 获取结果 response = client.execute(request, options); } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start)); } throw errorExecuting(request, e); } long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); // 解析此Response对象,解析后return(返回Object:可能是Response实例,也可能是decode解码后的任意类型)。 boolean shouldClose = true; try { if (logLevel != Logger.Level.NONE) { response = logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime); } if (Response.class == metadata.returnType()) { if (response.body() == null) { return response; } if (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE) { shouldClose = false; return response; } // Ensure the response body is disconnected byte[] bodyData = Util.toByteArray(response.body().asInputStream()); return response.toBuilder().body(bodyData).build(); } if (response.status() >= 200 && response.status() < 300) { if (void.class == metadata.returnType()) { return null; } else { Object result = decode(response); shouldClose = closeAfterDecode; return result; } } else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) { Object result = decode(response); shouldClose = closeAfterDecode; return result; } else { throw errorDecoder.decode(metadata.configKey(), response); } } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime); } throw errorReading(request, response, e); } finally { if (shouldClose) { ensureClosed(response.body()); } } } long elapsedTime(long start) { return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); } Request targetRequest(RequestTemplate template) { for (RequestInterceptor interceptor : requestInterceptors) { interceptor.apply(template); } return target.apply(template); } Object decode(Response response) throws Throwable { try { return decoder.decode(response, metadata.returnType()); } catch (FeignException e) { throw e; } catch (RuntimeException e) { throw new DecodeException(e.getMessage(), e); } } static class Factory { private final Client client; private final Retryer retryer; private final List<RequestInterceptor> requestInterceptors; private final Logger logger; private final Logger.Level logLevel; private final boolean decode404; private final boolean closeAfterDecode; private final ExceptionPropagationPolicy propagationPolicy; Factory(Client client, Retryer retryer, List<RequestInterceptor> requestInterceptors, Logger logger, Logger.Level logLevel, boolean decode404, boolean closeAfterDecode, ExceptionPropagationPolicy propagationPolicy) { this.client = checkNotNull(client, "client"); this.retryer = checkNotNull(retryer, "retryer"); this.requestInterceptors = checkNotNull(requestInterceptors, "requestInterceptors"); this.logger = checkNotNull(logger, "logger"); this.logLevel = checkNotNull(logLevel, "logLevel"); this.decode404 = decode404; this.closeAfterDecode = closeAfterDecode; this.propagationPolicy = propagationPolicy; } public MethodHandler create(Target<?> target, MethodMetadata md, RequestTemplate.Factory buildTemplateFromArgs, Options options, Decoder decoder, ErrorDecoder errorDecoder) { return new SynchronousMethodHandler(target, client, retryer, requestInterceptors, logger, logLevel, md, buildTemplateFromArgs, options, decoder, errorDecoder, decode404, closeAfterDecode, propagationPolicy); } } } 

2)重试机制

默认的重试机制:

默认的请求次数为5次,如下: 参数一:为下次发起重试请求 生成间隔时间算法的参数(时间单位:毫秒) 参数二:距下次发起重试请求最大的间隔时间(时间单位:毫秒) public Default() { this(100, SECONDS.toMillis(1), 5); } 

获取下一次重试间隔时间

/** * Calculates the time interval to a retry attempt. <br> * The interval increases exponentially with each attempt, at a rate of nextInterval *= 1.5 * (where 1.5 is the backoff factor), to the maximum interval. * * @return time in nanoseconds from now until the next attempt. */ long nextMaxInterval() { long interval = (long) (period * Math.pow(1.5, attempt - 1)); return interval > maxPeriod ? maxPeriod : interval; } 

取消feign重试

//取消重试 @Bean Retryer feignRetry() { return Retryer.NEVER_RETRY; } 

3.2、 Feign 使用OkHttp,Okhttp的实现原理

1)feign client 通过 OkHttpClient 完成request 到 Response的一次请求

使用 feign.okhttp.OkHttpClient (注意和okhttp3.OkHttpClient是不一样的),

实际上处理 HTTP URL 请求的是 feignClient(…) 方法中的 feign.okhttp.OkHttpClient.execute(…) 方法,源码如下:

@Override public feign.Response execute(feign.Request input, feign.Request.Options options) throws IOException { okhttp3.OkHttpClient requestScoped; if (delegate.connectTimeoutMillis() != options.connectTimeoutMillis() || delegate.readTimeoutMillis() != options.readTimeoutMillis()) { requestScoped = delegate.newBuilder() .connectTimeout(options.connectTimeoutMillis(), TimeUnit.MILLISECONDS) .readTimeout(options.readTimeoutMillis(), TimeUnit.MILLISECONDS) .followRedirects(options.isFollowRedirects()) .build(); } else { requestScoped = delegate; } Request request = toOkHttpRequest(input); Response response = requestScoped.newCall(request).execute(); return toFeignResponse(response, input).toBuilder().request(input).build(); } 

在默认没有配置的情况下,Options的初始化参数如下:

public Options() { this(10 * 1000, 60 * 1000); } // connectTimeoutMillis = 10_000 // readTimeoutMillis = 60_000 

也可以通过配置文件来修改默认配置,配置如下:

// default 为默认的,也可以根据feign的value值,单独配置 feign.client.config.default.connectTimeout=1000 feign.client.config.default.readTimeout=10000 

从源码可以看到,feign使用okhttp时,超时时间优先根据client的时间来设置

2)okhttp 执行层

// 其中requestScoped 是 okhttp3.OkHttpClient 的实例 Response response = requestScoped.newCall(request).execute(); 

这是应用程序中发起网络请求最顶端的调用,newCall(request) 方法返回 RealCall 对象。

RealCall 封装了一个 request 代表一个请求调用任务,RealCall 有两个重要的方法 execute() 和 enqueue(Callback responseCallback)。

execute() 是直接在当前线程执行请求,enqueue(Callback responseCallback) 是将当前任务加到任务队列中,执行异步请求。(异步请求的执行流程会在后续讲解)

3) 同步请求

@Override public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); try { client.dispatcher().executed(this); Response result = getResponseWithInterceptorChain(); if (result == null) throw new IOException("Canceled"); return result; } finally { client.dispatcher().finished(this); } } 

从执行层说到连接层,涉及到 getResponseWithInterceptorChain 方法中组织的各个拦截器的执行过程,其中 getResponseWithInterceptorChain 是关键,它使用了 责任链设计模式

4) 连接器执行过程(关键)

Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List<Interceptor> interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); interceptors.add(retryAndFollowUpInterceptor); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain( interceptors, null, null, null, 0, originalRequest); return chain.proceed(originalRequest); } 

5) Okhttp3 拦截器 RetryAndFollowUpInterceptor 重试机制

此拦截器将从故障中恢复,并根据需要执行重定向。如果调用被取消,它会抛出 *{@link IOException}

关于重定向次数:

/** * How many redirects and auth challenges should we attempt? Chrome follows 21 redirects; Firefox, * curl, and wget follow 20; Safari follows 16; and HTTP/1.0 recommends 5. */ private static final int MAX_FOLLOW_UPS = 20; 

6) Okhttp3 拦截器 BridgeInterceptor 桥梁

一个实现应用层和网络层直接的数据格式编码的桥。

第一: 把应用层客户端传过来的请求对象转换为 Http 网络协议所需字段的请求对象。第二: 把下游网络请求结果转换为应用层客户所需要的响应对象

默认设置HTTP长连接(开启Keep-Alive功能可使客户端到服务器端的连接持续有效,当出现对服务器的后继请求时,Keep-Alive功能避免了建立或者重新建立连接。)

if (userRequest.header("Connection") == null) { requestBuilder.header("Connection", "Keep-Alive"); } 

7) Okhttp3 拦截器 CacheInterceptor 缓存

为来自缓存的请求提供服务,并将响应写入缓存

8) Okhttp3 拦截器 ConnectInterceptor 连接

打开到目标服务器的连接并继续到下一个拦截器。

9) Okhttp3 拦截器 CallServerInterceptor 网络调用

这是链中的最后一个拦截器。它对服务器进行网络调用。

Spring Cloud Feign OkHttp Hystrix 常见问题&深入源码分析

10) ConnectionPool 实现

Spring Cloud Feign OkHttp Hystrix 常见问题&深入源码分析

管理HTTP和HTTP/2连接的重用以减少网络延迟

HTTP请求共享同一{@link Address} ,共享同一{@link Connection}

实现策略为将来使用而保持开放的连接。

默认实现中,使用一个双向队列来缓存所有连接, 这些连接中最多只能存在 5 个空闲连接,空闲连接最多只能存活 5 分钟。

 /** * Create a new connection pool with tuning parameters appropriate for a single-user application. * The tuning parameters in this pool are subject to change in future OkHttp releases. Currently * this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity. */ public ConnectionPool() { this(5, 5, TimeUnit.MINUTES); } 

如何复用Connection:遍历了所有的连接,然后判断某个连接是否可以复用;http1.x协议下当前socket没有其他流正在读写时可以复用,否则不行,http2.0对流数量没有限制。

如何清理连接池:每次put一个新连接的时候都会判断是否需要清理。遍历当前所有连接,跳过正在使用的连接,其他没有用的连接,如果哪个连接超过了规定的时间,就关掉这个socket。如果都没有超过规定时间的,就返回离规定时间最近的那个差值。拿到那个时间值后,我们再回到上面那个cleanupRunnable中,在那里会wait线程,然后醒来继续清理

以上论述,仅代表个人观点,作者水平有限,如有错误,欢迎批评指正。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/54544.html

(0)
上一篇 2024-07-31 16:26
下一篇 2024-08-01 18:15

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

关注微信