网关分类 常见的开源网关按照语言分类有如下几类:
Nginx+Lua: OpenResty、Kong 等;
Java: Zuul1/Zuul2、Spring Cloud Gateway、gravitee-gateway、Shenyu 等;
Go: janus、GoKu API Gateway 等;
Node.js: Express Gateway、
MicroGateway 等。
我们主要考虑 Java 语言的网关。接下来调研了Zuul1、Zuul2、Spring Cloud Gateway、Dromara Soul。
业界主流的网关基本上可以分为下面四种:
在进行技术选型的时候,主要考虑功能丰富度、性能、稳定性。Zuul2 是一个基于 Netty 框架的异步非阻塞的高性能网关。
Zuul2 整体概览 接下来我们简要介绍一下 Zuul2 关键知识点。
Zuul2 的架构图:
为了解释上面这张图,接下来会分别介绍几个点
如何解析 HTTP 协议 学习Zuul2需要一定的铺垫知识,比如:Google Guice、RxJava、Netflix archaius等,但是更关键的应该是:如何解析HTTP协议,会影响到后续Filter责任链的原理解析,为此先分析这个关键点。
首先我们介绍官方文档 中的一段话:
By default Zuul doesn’t buffer body content, meaning it streams the received headers to the origin before the body has been received.
This streaming behavior is very efficient and desirable, as long as your filter logic depends on header data.
翻译成中文:
默认情况下Zuul2并不会缓存请求体,也就意味着它可能会先发送接收到的请求Headers到后端服务,之后接收到请求体再继续发送到后端服务,发送请求体的时候,也不是组装为一个完整数据之后才发,而是接收到一部分,就转发一部分。
这个流式行为是高效的,只要Filter过滤的时候只依赖Headers的数据进行逻辑处理,而不需要解析RequestBody。
上面这段话映射到Netty Handler中,则意味着Zuul2并没有使用HttpObjectAggregator。
我们先看一下常规的Netty Server处理HTTP协议的样例:
NettyServer样例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Slf4j public class ConfigServerBootstrap { public static final int WORKER_THREAD_COUNT = Runtime.getRuntime().availableProcessors(); public void start () { int port = 8080 ; EventLoopGroup bossGroup = new NioEventLoopGroup (1 ); EventLoopGroup workerGroup = new NioEventLoopGroup (WORKER_THREAD_COUNT); final BizServerHandler bizServerHandler = new BizServerHandler (); try { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <Channel>() { @Override protected void initChannel (Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler (10 , 10 , 0 )); pipeline.addLast(new HttpServerCodec ()); pipeline.addLast(new HttpObjectAggregator (500 * 1024 * 1024 )); pipeline.addLast(bizServerHandler); } }); log.info("start netty server, port:{}" , port); serverBootstrap.bind(port).sync(); } catch (InterruptedException e) { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); log.error(String.format("start netty server error, port:%s" , port), e); } } }
这个例子中的两个关键类为:HttpServerCodec、HttpObjectAggregator。
HttpServerCodec是HttpRequestDecoder、HttpResponseEncoder的组合器。
HttpObjectAggregator的作用:组装HttpMessage、HttpContent为一个完整的FullHttpRequest或者FullHttpResponse。
当你不想关心chunked分块传输的时候,使用HttpObjectAggregator是非常有用的。
HTTP协议通常使用Content-Length来标识body的长度,在服务器端,需要先申请对应长度的buffer,然后再赋值。如果需要一边生产数据一边发送数据,就需要使用”Transfer-Encoding: chunked” 来代替Content-Length,也就是对数据进行分块传输。
接下来我们看一下Zuul2为了解析HTTP协议做了哪些处理。
Zuul的源码:https://github.com/Netflix/zuul ,基于v2.1.5。
1 2 3 4 5 6 7 8 protected void addHttp1Handlers (ChannelPipeline pipeline) { pipeline.addLast(HTTP_CODEC_HANDLER_NAME, createHttpServerCodec()); pipeline.addLast(new Http1ConnectionCloseHandler (connCloseDelay)); pipeline.addLast("conn_expiry_handler" , new Http1ConnectionExpiryHandler (maxRequestsPerConnection, maxRequestsPerConnectionInBrownout, connectionExpiry)); }
1 2 3 4 5 6 7 8 9 protected HttpServerCodec createHttpServerCodec () { return new HttpServerCodec ( MAX_INITIAL_LINE_LENGTH.get(), MAX_HEADER_SIZE.get(), MAX_CHUNK_SIZE.get(), false ); }
通过对比上面的样例发现,Zuul2并没有添加HttpObjectAggregator,也就是需要自行去处理chunked分块传输问题、自行组装请求体数据。
为了解决上面说的chunked分块传输问题,Zuul2通过判断是否LastHttpContent,来判断是否接收完成。
Zuul2 数据流转
如上图所示,Netty自带的HttpServerCodec会将网络二进制流转换为Netty的HttpRequest对象,再通过ClientRequestReceiver编解码器将HttpRequest转换为Zuul的请求对象HttpRequestMessageImpl;
请求体RequestBody在Netty自带的HttpServerCodec中被映射为HttpContent对象,ClientRequestReceiver编解码器依次接收HttpContent对象。
完成了上述数据的转换之后,就流转到了最重要的编解码ZuulFilterChainHandler,里面会执行Filter链,也会发起网络请求到真正的后端服务,这一切都是在ZuulFilterChainHandler中完成的。
得到了后端服务的响应结果之后,也经过了Outbound Filter的过滤,接下来就是通过ClientResponseWriter把Zuul自定义的响应对象HttpResponseMessageImpl转换为Netty的HttpResponse对象,然后通过HttpServerCodec转换为ByteBuf对象,发送网络二进制流,完成响应结果的输出。
这里需要特别说明的是:由于Zuul2默认不组装一个完整的请求对象/响应对象,所以Zuul2是分别针对请求头+请求Headers、请求体进行Filter过滤拦截的,也就是说对于请求,会走两遍前置Filter链,对于响应结果,也是会走两遍后置Filter链拦截 。
两个责任链 Netty ChannelPipeline责任链 Netty的ChannelPipeline设计,通过往ChannelPipeline中动态增减Handler进行定制扩展。
接下来看一下Zuul2 Netty Server中的pipeline有哪些Handler?
接着继续看一下Zuul2 Netty Client的Handler有哪些?
本文不针对具体的Handler进行详细解释,主要是给大家一个整体的视图。
Filter责任链
请求发送到Netty Server中,先进行Inbound Filters的拦截处理,接着会调用Endpoint Filter,这里默认为ProxyEndPoint(里面封装了Netty Client),发送请求到真实后端服务,获取到响应结果之后,再执行Outbound Filters,最终返回响应结果。
三种类型的Filter之间是通过nextStage属性来衔接的。
Zuul2存在一个定时任务线程GroovyFilterFileManagerPoller,定期扫描特定的目录,通过比对文件的更新时间戳,来判断是否发生变化,如果有变化,则重新编译并放入到内存中。
通过定位任务实现了Filter的动态加载。
源码剖析 从整体来看,Zuul2是一个在 Netty
上运行一系列Filter的服务,执行完成PreFilter (inbound filters)之后将请求通过 Netty Client
转发出去,然后将请求的结果通过一系列PostFilter (outbound filters) 返回。
Zuul2的Filter分为三种类型:
Inbound Filters: 在路由之前执行
Endpoint Filters: 路由操作
Outbound Filters: 得到相应数据之后执行。
我们用官方的Demo进行分析 zuul-sample ,如下图所示:
ServerStartup 在Demo的启动中,我们发现启动的入口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 try { Injector injector = Guice.createInjector(new ZuulSampleModule ()); BaseServerStartup serverStartup = injector.getInstance(BaseServerStartup.class); server = serverStartup.server(); server.start(); long startupDuration = System.nanoTime() - startNanos; logger.info( "Zuul Sample: finished startup. Duration = {}ms" , TimeUnit.NANOSECONDS.toMillis(startupDuration)); server.awaitTermination(); } catch (Throwable t) { ... } finally { ... }
首先使用的是Google的Guice进行依赖注入,拿到一个BaseServerStartup
对象, 然后启动一个服务。
我们从这个 start()
作为我们的突破口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public void start () { if (jvmShutdownHook != null ) { Runtime.getRuntime().addShutdownHook(jvmShutdownHook); } serverGroup = new ServerGroup ( "Salamander" , eventLoopConfig.acceptorCount(), eventLoopConfig.eventLoopCount(), eventLoopGroupMetrics); serverGroup.initializeTransport(); try { List<ChannelFuture> allBindFutures = new ArrayList <>(addressesToInitializers.size()); for (Map.Entry<NamedSocketAddress, ? extends ChannelInitializer <?>> entry : addressesToInitializers.entrySet()) { NamedSocketAddress requestedNamedAddr = entry.getKey(); ChannelFuture nettyServerFuture = setupServerBootstrap(requestedNamedAddr, entry.getValue()); Channel chan = nettyServerFuture.channel(); addressesToChannels.put(requestedNamedAddr.withNewSocket(chan.localAddress()), chan); allBindFutures.add(nettyServerFuture); } if (!allBindFutures.isEmpty()) { ByteBufAllocator alloc = allBindFutures.get(0 ).channel().alloc(); if (alloc instanceof ByteBufAllocatorMetricProvider) { ByteBufAllocatorMetric metrics = ((ByteBufAllocatorMetricProvider) alloc).metric(); PolledMeter.using(registry) .withId(registry.createId("zuul.nettybuffermem.live" , "type" , "heap" )) .monitorValue(metrics, ByteBufAllocatorMetric::usedHeapMemory); PolledMeter.using(registry) .withId(registry.createId("zuul.nettybuffermem.live" , "type" , "direct" )) .monitorValue(metrics, ByteBufAllocatorMetric::usedDirectMemory); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }
可以看到,这里主要是绑定了future的回调关系,然后有一个函数setupServerBootstrap
,我们知道Netty需要使用 ServerBootstrap
进行 端口绑定,那么重点关注这个函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 private ChannelFuture setupServerBootstrap ( NamedSocketAddress listenAddress, ChannelInitializer<?> channelInitializer) throws InterruptedException { ServerBootstrap serverBootstrap = new ServerBootstrap ().group(serverGroup.clientToProxyBossPool, serverGroup.clientToProxyWorkerPool); LOG.info("Proxy listening with {}" , serverGroup.channelType); serverBootstrap.channel(serverGroup.channelType); serverBootstrap.option(ChannelOption.SO_BACKLOG, 128 ); serverBootstrap.childOption(ChannelOption.SO_LINGER, -1 ); serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true ); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true ); for (Map.Entry<ChannelOption<?>, ?> optionEntry : serverGroup.transportChannelOptions.entrySet()) { serverBootstrap = serverBootstrap.option((ChannelOption) optionEntry.getKey(), optionEntry.getValue()); } serverBootstrap.handler(new NewConnHandler ()); serverBootstrap.childHandler(channelInitializer); serverBootstrap.validate(); LOG.info("Binding to : {}" , listenAddress); if (MANUAL_DISCOVERY_STATUS.get()) { serverStatusManager.localStatus(InstanceInfo.InstanceStatus.UP); } ChannelFuture bindFuture = serverBootstrap.bind(listenAddress.unwrap()); try { return bindFuture.sync(); } catch (Exception e) { throw new RuntimeException ("Failed to bind on addr " + listenAddress, e); } }
可以看到其中创建了一个serverBootstrap
对象,然后对其进行了参数设置,这个正是Netty的启动类,是zuul2的核心。最后调用bindFuture.sync()
进行同步等待。
我们已经发现了Zuul2是如何启动一个Netty服务的,那我们接下来去了解最为重要的这些Filter是如何工作的,我们在上文启动中已经发现一个很重要的对象 channelInitializer
。
1 serverBootstrap.childHandler(channelInitializer);
我们知道在Netty中,是将一系列的 Handler
聚合在一起并使用 Pipeline
执行,那么我们从这个 channelInitializer
入手去研究。
channelInitializer
是作为参数传入setupServerBootstrap函数的,我们找到调用方,发现是个MapaddressesToInitializers
中的key,再找到这个 Map 的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private final Map<NamedSocketAddress, ? extends ChannelInitializer <?>> addressesToInitializers;public Server ( Registry registry, ServerStatusManager serverStatusManager, Map<NamedSocketAddress, ? extends ChannelInitializer<?>> addressesToInitializers, ClientConnectionsShutdown clientConnectionsShutdown, EventLoopGroupMetrics eventLoopGroupMetrics, EventLoopConfig eventLoopConfig, Thread jvmShutdownHook) { this .registry = Objects.requireNonNull(registry); this .addressesToInitializers = Collections.unmodifiableMap(new LinkedHashMap <>(addressesToInitializers; this .serverStatusManager = Preconditions.checkNotNull(serverStatusManager, "serverStatusManager" ); this .clientConnectionsShutdown = Preconditions.checkNotNull(clientConnectionsShutdown, "clientConnectionsShutdown" ); this .eventLoopConfig = Preconditions.checkNotNull(eventLoopConfig, "eventLoopConfig" ); this .eventLoopGroupMetrics = Preconditions.checkNotNull(eventLoopGroupMetrics, "eventLoopGroupMetrics" ); this .jvmShutdownHook = jvmShutdownHook; }
接下来,找到初始化Server
的地方,在BaseServerStartup
的init
方法中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Inject public void init () throws Exception { ChannelGroup clientChannels = new DefaultChannelGroup (GlobalEventExecutor.INSTANCE); clientConnectionsShutdown = new ClientConnectionsShutdown (clientChannels, GlobalEventExecutor.INSTANCE, discoveryClient); addrsToChannelInitializers = chooseAddrsAndChannels(clientChannels); server = new Server ( registry, serverStatusManager, addrsToChannelInitializers, clientConnectionsShutdown, eventLoopGroupMetrics, eventLoopConfig); }
可以发现,我们心心念念的addrsToChannelInitializers
通过函数chooseAddrsAndChannels
构造。
继续深入:
1 2 3 4 5 6 7 8 9 10 11 12 @ForOverride protected Map<NamedSocketAddress, ChannelInitializer<?>> chooseAddrsAndChannels(ChannelGroup clientChannels) { @SuppressWarnings("unchecked") Map<Integer, ChannelInitializer<?>> portMap = (Map<Integer, ChannelInitializer<?>>) (Map) choosePortsAndChannels(clientChannels); return Server.convertPortMap(portMap); } @Deprecated protected Map<Integer, ChannelInitializer> choosePortsAndChannels (ChannelGroup clientChannels) { throw new UnsupportedOperationException ("unimplemented" ); }
发现该方法是protected
的,需要由子类实现,否则会报错。
进入到ServerStartup
中,可以发现,正是channelInitializer
的构造逻辑,这里笔者为节省篇幅,只给出 HTTP 的示例,其他协议类似。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 switch (SERVER_TYPE) { case HTTP: channelConfig.set( CommonChannelConfigKeys.allowProxyHeadersWhen, StripUntrustedProxyHeadersHandler.AllowWhen.ALWAYS); channelConfig.set(CommonChannelConfigKeys.preferProxyProtocolForClientIp, false ); channelConfig.set(CommonChannelConfigKeys.isSSlFromIntermediary, false ); channelConfig.set(CommonChannelConfigKeys.withProxyProtocol, false ); addrsToChannels.put( new NamedSocketAddress ("http" , sockAddr), new ZuulServerChannelInitializer (metricId, channelConfig, channelDependencies, clientChannels) { @Override protected void addHttp1Handlers (ChannelPipeline pipeline) { super .addHttp1Handlers(pipeline); pipeline.addLast(new HttpContentCompressor ((CompressionOptions[]) null )); } }); logAddrConfigured(sockAddr); break ; case HTTP2: case HTTP_MUTUAL_TLS: case WEBSOCKET: case SSE: } return Collections.unmodifiableMap(addrsToChannels);
ZuulServerChannelInitializer 我们可以发现 ChannelInitializer
其实是 ZuulServerChannelInitializer
对象。在initChannel中我们发现了
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override protected void initChannel (Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); storeChannel(ch); addTimeoutHandlers(pipeline); addPassportHandler(pipeline); addTcpRelatedHandlers(pipeline); addHttp1Handlers(pipeline); addHttpRelatedHandlers(pipeline); addZuulHandlers(pipeline); }
在前面的都比较简单都是一些标准的 Handler
,最为重要是 addZuulHandlers(pipeline);
这个函数,我们继续深入。
1 2 3 4 5 6 7 protected void addZuulHandlers (final ChannelPipeline pipeline) { pipeline.addLast("logger" , nettyLogger); pipeline.addLast(new ClientRequestReceiver (sessionContextDecorator)); pipeline.addLast(passportLoggingHandler); addZuulFilterChainHandler(pipeline); pipeline.addLast(new ClientResponseWriter (requestCompleteHandler, registry)); }
上面的都很容易看出来,是日志,Session之类的Handler,最为重要的是addZuulFilterChainHandler
,此处增加了 ZuulFilter。
继续深入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 protected void addZuulFilterChainHandler (final ChannelPipeline pipeline) { final ZuulFilter<HttpResponseMessage, HttpResponseMessage>[] responseFilters = getFilters( new OutboundPassportStampingFilter (PassportState.FILTERS_OUTBOUND_START), new OutboundPassportStampingFilter (PassportState.FILTERS_OUTBOUND_END));➊ final ZuulFilterChainRunner<HttpResponseMessage> responseFilterChain = getFilterChainRunner(responseFilters, filterUsageNotifier); final FilterRunner<HttpRequestMessage, HttpResponseMessage> endPoint = getEndpointRunner(responseFilterChain, filterUsageNotifier, filterLoader); final ZuulFilter<HttpRequestMessage, HttpRequestMessage>[] requestFilters = getFilters( new InboundPassportStampingFilter (PassportState.FILTERS_INBOUND_START), new InboundPassportStampingFilter (PassportState.FILTERS_INBOUND_END)); final ZuulFilterChainRunner<HttpRequestMessage> requestFilterChain = getFilterChainRunner(requestFilters, filterUsageNotifier, endPoint); pipeline.addLast(new ZuulFilterChainHandler (requestFilterChain, responseFilterChain)); }
从 ➊ 深入可以看到:
1 2 3 4 5 6 7 8 9 10 11 12 public <T extends ZuulMessage > ZuulFilter<T, T>[] getFilters(ZuulFilter<T, T> start, ZuulFilter<T, T> stop) { final SortedSet<ZuulFilter<?, ?>> zuulFilters = filterLoader.getFiltersByType(start.filterType()); final ZuulFilter<T, T>[] filters = new ZuulFilter [zuulFilters.size() + 2 ]; filters[0 ] = start; int i = 1 ; for (ZuulFilter<?, ?> filter : zuulFilters) { filters[i++] = (ZuulFilter<T, T>) filter; } filters[filters.length - 1 ] = stop; return filters; }
这里返回了一个 ZuulFilter
的数组,开始分别是 start
和 stop
对应的刚好是 OutboundPassportStampingFilter
。
然我们继续回到 addZuulFilterChainHandler()
函数上来,我们发现有三段相似的代码正好对应着获得了 InBound
OutBond
EndPoint
这三种Filter,在代码我们可以看出顺序是
requestFilters
和 endPointFilters
合并成 requestFilterChain
responseFilters
构建成 responseFilterChain
requestFilterChain
和 responseFilterChain
组合成 ZuulFilterChainHandler
将 ZuulFilterChainHandler
添加至 pipeline
中
那这里我们还有一个疑问,这些Filter是从何而来的?这个答案隐藏在 com.netflix.zuul.DynamicFilterLoader.getFiltersByType
中,通过简单的跟踪我们可以得到
1 2 3 4 @Override public Collection<ZuulFilter<?, ?>> getAllFilters() { return Collections.unmodifiableList(new ArrayList <>(filters.values())); }
在这里 获得所有的Fiter,而这里的Filter看起来是通过 put
进来的,通过一个简单的断点,我们就可以发现:
1 2 3 4 @Override public void put (String key, ZuulFilter<?, ?> filter) { filters.putIfAbsent(Objects.requireNonNull(key, "key" ), Objects.requireNonNull(filter, "filter" )); }
再深入,我们从init
理一下调用逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 @Inject public void init () throws Exception { if (!config.enabled) { return ; } long startTime = System.currentTimeMillis(); ThreadFactory tf = new ThreadFactoryBuilder () .setDaemon(true ) .setNameFormat("FilterFileManager_ProcessFiles-%d" ) .build(); this .processFilesService = Executors.newFixedThreadPool(FILE_PROCESSOR_THREADS.get(), tf); filterLoader.putFiltersForClasses(config.getClassNames()); manageFiles(); startPoller(); LOG.warn("Finished loading all zuul filters. Duration = {} ms." , (System.currentTimeMillis() - startTime)); } void manageFiles () { try { List<File> aFiles = getFiles(); processGroovyFiles(aFiles); } catch (Exception e) { String msg = "Error updating groovy filters from disk!" ; LOG.error(msg, e); throw new RuntimeException (msg, e); } } void processGroovyFiles (List<File> aFiles) throws Exception { List<Callable<Boolean>> tasks = new ArrayList <>(); for (File file : aFiles) { tasks.add(() -> { try { return filterLoader.putFilter(file); } catch (Exception e) { LOG.error("Error loading groovy filter from disk! file = {}" , String.valueOf(file), e); return false ; } }); } processFilesService.invokeAll(tasks, FILE_PROCESSOR_TASKS_TIMEOUT_SECS.get(), TimeUnit.SECONDS); } @Override public boolean putFilter (File file) { if (!filterRegistry.isMutable()) { return false ; } try { String sName = file.getAbsolutePath(); if (filterClassLastModified.get(sName) != null && (file.lastModified() != filterClassLastModified.get(sName))) { LOG.debug("reloading filter {}" , sName); filterRegistry.remove(sName); } ZuulFilter<?, ?> filter = filterRegistry.get(sName); if (filter == null ) { Class<?> clazz = compiler.compile(file); if (!Modifier.isAbstract(clazz.getModifiers())) { filter = filterFactory.newInstance(clazz); putFilter(sName, filter, file.lastModified()); return true ; } } } catch (Exception e) { LOG.error("Error loading filter! Continuing. file={}" , file, e); return false ; } return false ; }
可以看到:我们通过类的全称限定类名获得的这个Fitler,这个配置是在我们的配置文件中配置的。
至此,我们已经明白了Zuul2如何将自己的 ZuulFilter
变换成 Netty Handler
并添加到 Netty Pipeline
之中的,那我们还剩下一个问题,这个 ZuulFilter
是如何运作的 。但是我们在上段中,我们已经发现了最后是一个 ZuulFilterChainHandler
通过名称我们可以推测出,这是一个 Chain 链
,我们继续往下探索吧。
ZuulFilterChainHandler 我们知道,最终注册到 Netty Pipeline
上的最终肯定是 Handler
, 我们只需要从 Netty 的 channelRead()
函数作为突破口去阅读。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof HttpRequestMessage) { ➊ zuulRequest = (HttpRequestMessage) msg; final SessionContext zuulCtx = zuulRequest.getContext(); zuulCtx.put(CommonContextKeys.NETTY_SERVER_CHANNEL_HANDLER_CONTEXT, ctx); requestFilterChain.filter(zuulRequest);➋ } else if ((msg instanceof HttpContent) && (zuulRequest != null )) { ➌ requestFilterChain.filter(zuulRequest, (HttpContent) msg); } else { logger.debug( "Received unrecognized message type. {}" , msg.getClass().getName()); ReferenceCountUtil.release(msg);➍ } }
➊ 这段逻辑处理 已经被转化为 HttpRequestMessage
类型的消息,➋ 是实际上的 filter 处理逻辑。
➌ 处理还没被转化为 HttpRequestMessage
类型的消息。
➍ 无法处理抛出异常,释放MSG。
而这里的 requestFilterChain
就是之前我们传入进去的 ZuulFilterChainRunner
我们看看这 filter()
函数做了什么?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Override public void filter (final T inMesg) { try (TaskCloseable ignored = PerfMark.traceTask(this , s -> s.getClass().getSimpleName() + ".filter" )) { addPerfMarkTags(inMesg); runFilters(inMesg, initRunningFilterIndex(inMesg)); } } private final void runFilters (final T mesg, final AtomicInteger runningFilterIdx) { T inMesg = mesg; String filterName = "-" ; try { Preconditions.checkNotNull(mesg, "Input message" ); int i = runningFilterIdx.get(); ➊ while (i < filters.length) { final ZuulFilter<T, T> filter = filters[i]; ➋ filterName = filter.filterName(); final T outMesg = filter(filter, inMesg); ➌ if (outMesg == null ) { return ; } inMesg = outMesg; i = runningFilterIdx.incrementAndGet(); ➍ } invokeNextStage(inMesg); ➎ } catch (Exception ex) { } }
➊ 获得当前运行的Filter的下标值,➋ 获得对应的 ZuulFilter
,➌ 调用 ZuulFilter
进行处理,➍ 将下标志值 +1,继续循环体,➎ 执行下个阶段,这里对应着我们自己再构建 new InboundPassportStampingFilter(FILTERS_INBOUND_END)
通过这段代码,我们知道了 Zuul2
的Chain是由 ChainRunner
运行。 我们继续看一下 BaseZuulFilterRunner
中的 filter()
函数。
忽略一些次要的代码,我们来看核心逻辑。这里的调用分为同步和异步 :
1 2 3 4 5 6 7 8 9 10 11 if (filter.getSyncType() == FilterSyncType.SYNC) { final SyncZuulFilter<I, O> syncFilter = (SyncZuulFilter) filter; final O outMesg; try (TaskCloseable ignored2 = PerfMark.traceTask(filter, f -> f.filterName() + ".apply" )) { addPerfMarkTags(inMesg); outMesg = syncFilter.apply(inMesg); } recordFilterCompletion(ExecutionStatus.SUCCESS, filter, startTime, inMesg, snapshot); return (outMesg != null ) ? outMesg : filter.getDefaultOutput(inMesg); }
异步:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 try (TaskCloseable ignored2 = PerfMark.traceTask(filter, f -> f.filterName() + ".applyAsync" )) { final Link nettyToSchedulerLink = PerfMark.linkOut(); filter.incrementConcurrency(); resumer = new FilterChainResumer (inMesg, filter, snapshot, startTime); filter.applyAsync(inMesg) .doOnSubscribe(() -> { try (TaskCloseable ignored3 = PerfMark.traceTask(filter, f -> f.filterName() + ".onSubscribeAsync" )) { PerfMark.linkIn(nettyToSchedulerLink); } }) .doOnNext(resumer.onNextStarted(nettyToSchedulerLink)) .doOnError(resumer.onErrorStarted(nettyToSchedulerLink)) .doOnCompleted(resumer.onCompletedStarted(nettyToSchedulerLink)) .observeOn( Schedulers.from(getChannelHandlerContext(inMesg).executor())) .doOnUnsubscribe(resumer::decrementConcurrency) .subscribe(resumer); } return null ; } catch (Throwable t) { if (resumer != null ) { resumer.decrementConcurrency(); } final O outMesg = handleFilterException(inMesg, filter, t); outMesg.finishBufferedBodyIfIncomplete(); recordFilterCompletion(ExecutionStatus.FAILED, filter, startTime, inMesg, snapshot); return outMesg; } }
ProxyEndpoint 通过上面的一系列分析,我们已经知道的,Zuul的 调用链模型
,PreFilters
的运行机制。但是,具体的远程调用在哪里体现呢?
在官网的Wiki中,我们也可以获得
Zuul does not use Ribbon for making outgoing connections and instead uses its own connection pool, using a Netty client. Zuul creates a connection pool per host, per event loop. It does this in order to reduce context switching between threads and to ensure sanity for both the inbound event loops and outbound event loops. The result is that the entire request is run on the same thread, regardless of which event loop is running it.
我们从 Wiki
中可以得知,Netty不再默认使用 Ribbon 而是默认使用 Netty
作为一个 Client
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 protected ZuulFilter<HttpRequestMessage, HttpResponseMessage> getEndpoint ( final String endpointName, final HttpRequestMessage zuulRequest) { if (PROXY_ENDPOINT_FILTER_NAME.equals(endpointName)) { return newProxyEndpoint(zuulRequest); } return filter; } protected ZuulFilter<HttpRequestMessage, HttpResponseMessage> newProxyEndpoint (HttpRequestMessage zuulRequest) { return new ProxyEndpoint ( zuulRequest, getChannelHandlerContext(zuulRequest), getNextStage(), MethodBinding.NO_OP_BINDING); }
具体逻辑在ProxyEndpoint这个类中 。
我们从filter的核心逻辑 apply
看起来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public HttpResponseMessage apply (final HttpRequestMessage input) { try { if (origin == null ) { handleNoOriginSelected(); return null ; } origin.onRequestExecutionStart(zuulRequest); proxyRequestToOrigin();➊ return null ; } catch (Exception ex) { handleError(ex); return null ; } }
➊ 将请求转发至远端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private void proxyRequestToOrigin () { Promise<PooledConnection> promise = null ; try { promise = origin.connectToOrigin( zuulRequest, channelCtx.channel().eventLoop(), attemptNum, passport, chosenServer, chosenHostAddr);➊ if (promise.isDone()) { operationComplete(promise);➋ } else { promise.addListener(this ); } } catch (Exception ex) { } }
➊ 处将请求包装,连接到远端地址,获得 Promise ➋ 结束的 Promise 处理,在 operationComplete()
中包含了成功的执行代码,至于 connectToOrigin
Zuul 包装了 Netty的Client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private void writeClientRequestToOrigin (final PooledConnection conn, Duration readTimeout) { final Channel ch = conn.getChannel();➊ passport.setOnChannel(ch); ch.attr(ClientTimeoutHandler.ORIGIN_RESPONSE_READ_TIMEOUT).set(readTimeout); context.put(CommonContextKeys.ORIGIN_CHANNEL, ch); context.set(POOLED_ORIGIN_CONNECTION_KEY, conn); preWriteToOrigin(chosenServer.get(), zuulRequest); final ChannelPipeline pipeline = ch.pipeline(); originResponseReceiver = getOriginResponseReceiver(); pipeline.addBefore( DefaultOriginChannelInitializer.CONNECTION_POOL_HANDLER, OriginResponseReceiver.CHANNEL_HANDLER_NAME, originResponseReceiver); ch.write(zuulRequest);➋ writeBufferedBodyContent(zuulRequest, ch); ch.flush();➌ syncClientAndOriginChannels(channelCtx.channel(), ch); ch.read();➍ originConn = conn; channelCtx.read();➎ }
➊ 获得建立的连接 ➋ 写入Zuul的请求,也就是用户的请求 ➌ 将消息Flush出去 ➍ 在这里读取响应的数据,也就是触发 OutBoundHandler
的处理时间。
之后就继续调用 response filter chain了。
总结 Zuul整体逻辑,我们通过博文可以分析而出。
ZuulFilter 分为 Inbound
, Outbound
, EndPoint
Inbound
, Outbound
, EndPoint
包裹成 ChainRunner
ChainRunner
组合成一个 ZuulFilterChainHandler
,而 ZuulFilterChainHandler
是Netty的 一个Handler
ZuulFilterChainHandler
会组装到 Netty 的 Pipeline
中,剩下来就是Netty的流程。
参考