网关分类

常见的开源网关按照语言分类有如下几类:

  • Nginx+Lua: OpenResty、Kong 等;

  • Java: Zuul1/Zuul2、Spring Cloud Gateway、gravitee-gateway、Shenyu 等;

  • Go: janus、GoKu API Gateway 等;

  • Node.js: Express Gateway、

    MicroGateway 等。

我们主要考虑 Java 语言的网关。接下来调研了Zuul1、Zuul2、Spring Cloud Gateway、Dromara Soul。

业界主流的网关基本上可以分为下面四种:

  • Servlet + 线程池

  • NIO(Tomcat / Jetty) + Servlet 3.0 异步

  • Spring WebFlux

  • NettyServer + NettyClient

在进行技术选型的时候,主要考虑功能丰富度、性能、稳定性。Zuul2 是一个基于 Netty 框架的异步非阻塞的高性能网关。

Zuul2 整体概览

接下来我们简要介绍一下 Zuul2 关键知识点。

Zuul2 的架构图:

为了解释上面这张图,接下来会分别介绍几个点

  • 如何解析 HTTP 协议

  • Zuul2 的数据流转

  • 两个责任链:Netty ChannelPipeline责任链 + Filter责任链

如何解析 HTTP 协议

学习Zuul2需要一定的铺垫知识,比如:Google Guice、RxJava、Netflix archaius等,但是更关键的应该是:如何解析HTTP协议,会影响到后续Filter责任链的原理解析,为此先分析这个关键点。

首先我们介绍官方文档中的一段话:

By default Zuul doesn’t buffer body content, meaning it streams the received headers to the origin before the body has been received.

This streaming behavior is very efficient and desirable, as long as your filter logic depends on header data.

翻译成中文:

默认情况下Zuul2并不会缓存请求体,也就意味着它可能会先发送接收到的请求Headers到后端服务,之后接收到请求体再继续发送到后端服务,发送请求体的时候,也不是组装为一个完整数据之后才发,而是接收到一部分,就转发一部分。

这个流式行为是高效的,只要Filter过滤的时候只依赖Headers的数据进行逻辑处理,而不需要解析RequestBody。

上面这段话映射到Netty Handler中,则意味着Zuul2并没有使用HttpObjectAggregator。

我们先看一下常规的Netty Server处理HTTP协议的样例:

NettyServer样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Slf4j
public class ConfigServerBootstrap {

public static final int WORKER_THREAD_COUNT = Runtime.getRuntime().availableProcessors();

public void start(){
int port = 8080;
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(WORKER_THREAD_COUNT);

final BizServerHandler bizServerHandler = new BizServerHandler();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(10, 10, 0));
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(500 * 1024 * 1024));
pipeline.addLast(bizServerHandler);
}
});
log.info("start netty server, port:{}", port);
serverBootstrap.bind(port).sync();
} catch (InterruptedException e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
log.error(String.format("start netty server error, port:%s", port), e);
}
}
}

这个例子中的两个关键类为:HttpServerCodec、HttpObjectAggregator。

HttpServerCodec是HttpRequestDecoder、HttpResponseEncoder的组合器。

  • HttpRequestDecoder职责: 将输入的ByteBuf解析成HttpRequest、HttpContent对象。

  • HttpResponseEncoder职责:

    将HttpResponse、HttpContent对象转换为ByteBuf,进行网络二进制流的输出。

HttpObjectAggregator的作用:组装HttpMessage、HttpContent为一个完整的FullHttpRequest或者FullHttpResponse。

当你不想关心chunked分块传输的时候,使用HttpObjectAggregator是非常有用的。

HTTP协议通常使用Content-Length来标识body的长度,在服务器端,需要先申请对应长度的buffer,然后再赋值。如果需要一边生产数据一边发送数据,就需要使用”Transfer-Encoding: chunked” 来代替Content-Length,也就是对数据进行分块传输。

接下来我们看一下Zuul2为了解析HTTP协议做了哪些处理。

Zuul的源码:https://github.com/Netflix/zuul,基于v2.1.5。

1
2
3
4
5
6
7
8
// com.netflix.zuul.netty.server.BaseZuulChannelInitializer#addHttp1Handlers
protected void addHttp1Handlers(ChannelPipeline pipeline) {
pipeline.addLast(HTTP_CODEC_HANDLER_NAME, createHttpServerCodec());

pipeline.addLast(new Http1ConnectionCloseHandler(connCloseDelay));
pipeline.addLast("conn_expiry_handler",
new Http1ConnectionExpiryHandler(maxRequestsPerConnection, maxRequestsPerConnectionInBrownout, connectionExpiry));
}
1
2
3
4
5
6
7
8
9
// com.netflix.zuul.netty.server.BaseZuulChannelInitializer#createHttpServerCodec
protected HttpServerCodec createHttpServerCodec() {
return new HttpServerCodec(
MAX_INITIAL_LINE_LENGTH.get(),
MAX_HEADER_SIZE.get(),
MAX_CHUNK_SIZE.get(),
false
);
}

通过对比上面的样例发现,Zuul2并没有添加HttpObjectAggregator,也就是需要自行去处理chunked分块传输问题、自行组装请求体数据。

为了解决上面说的chunked分块传输问题,Zuul2通过判断是否LastHttpContent,来判断是否接收完成。

Zuul2 数据流转

如上图所示,Netty自带的HttpServerCodec会将网络二进制流转换为Netty的HttpRequest对象,再通过ClientRequestReceiver编解码器将HttpRequest转换为Zuul的请求对象HttpRequestMessageImpl;

请求体RequestBody在Netty自带的HttpServerCodec中被映射为HttpContent对象,ClientRequestReceiver编解码器依次接收HttpContent对象。

完成了上述数据的转换之后,就流转到了最重要的编解码ZuulFilterChainHandler,里面会执行Filter链,也会发起网络请求到真正的后端服务,这一切都是在ZuulFilterChainHandler中完成的。

得到了后端服务的响应结果之后,也经过了Outbound Filter的过滤,接下来就是通过ClientResponseWriter把Zuul自定义的响应对象HttpResponseMessageImpl转换为Netty的HttpResponse对象,然后通过HttpServerCodec转换为ByteBuf对象,发送网络二进制流,完成响应结果的输出。

这里需要特别说明的是:由于Zuul2默认不组装一个完整的请求对象/响应对象,所以Zuul2是分别针对请求头+请求Headers、请求体进行Filter过滤拦截的,也就是说对于请求,会走两遍前置Filter链,对于响应结果,也是会走两遍后置Filter链拦截

两个责任链

Netty ChannelPipeline责任链

Netty的ChannelPipeline设计,通过往ChannelPipeline中动态增减Handler进行定制扩展。

接下来看一下Zuul2 Netty Server中的pipeline有哪些Handler?

接着继续看一下Zuul2 Netty Client的Handler有哪些?

本文不针对具体的Handler进行详细解释,主要是给大家一个整体的视图。

Filter责任链

请求发送到Netty Server中,先进行Inbound Filters的拦截处理,接着会调用Endpoint Filter,这里默认为ProxyEndPoint(里面封装了Netty Client),发送请求到真实后端服务,获取到响应结果之后,再执行Outbound Filters,最终返回响应结果。

三种类型的Filter之间是通过nextStage属性来衔接的。

Zuul2存在一个定时任务线程GroovyFilterFileManagerPoller,定期扫描特定的目录,通过比对文件的更新时间戳,来判断是否发生变化,如果有变化,则重新编译并放入到内存中。

通过定位任务实现了Filter的动态加载。

源码剖析

从整体来看,Zuul2是一个在 Netty 上运行一系列Filter的服务,执行完成PreFilter (inbound filters)之后将请求通过 Netty Client 转发出去,然后将请求的结果通过一系列PostFilter (outbound filters) 返回。

Zuul2的Filter分为三种类型:

  • Inbound Filters: 在路由之前执行
  • Endpoint Filters: 路由操作
  • Outbound Filters: 得到相应数据之后执行。

我们用官方的Demo进行分析 zuul-sample,如下图所示:

image-20241005110305880

ServerStartup

在Demo的启动中,我们发现启动的入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
try {
Injector injector = Guice.createInjector(new ZuulSampleModule());
BaseServerStartup serverStartup = injector.getInstance(BaseServerStartup.class);
server = serverStartup.server();

server.start();
long startupDuration = System.nanoTime() - startNanos;
logger.info(
"Zuul Sample: finished startup. Duration = {}ms", TimeUnit.NANOSECONDS.toMillis(startupDuration));
server.awaitTermination();
} catch (Throwable t) {
...
} finally {
...
}

首先使用的是Google的Guice进行依赖注入,拿到一个BaseServerStartup对象, 然后启动一个服务。

我们从这个 start() 作为我们的突破口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public void start() {
if (jvmShutdownHook != null) {
Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
}

serverGroup = new ServerGroup(
"Salamander", eventLoopConfig.acceptorCount(), eventLoopConfig.eventLoopCount(), eventLoopGroupMetrics);
serverGroup.initializeTransport();
try {
List<ChannelFuture> allBindFutures = new ArrayList<>(addressesToInitializers.size());

// Setup each of the channel initializers on requested ports.
for (Map.Entry<NamedSocketAddress, ? extends ChannelInitializer<?>> entry :
addressesToInitializers.entrySet()) {
NamedSocketAddress requestedNamedAddr = entry.getKey();
ChannelFuture nettyServerFuture = setupServerBootstrap(requestedNamedAddr, entry.getValue());
Channel chan = nettyServerFuture.channel();
addressesToChannels.put(requestedNamedAddr.withNewSocket(chan.localAddress()), chan);
allBindFutures.add(nettyServerFuture);
}

// All channels should share a single ByteBufAllocator instance.
// Add metrics to monitor that allocator's memory usage.
if (!allBindFutures.isEmpty()) {
ByteBufAllocator alloc = allBindFutures.get(0).channel().alloc();
if (alloc instanceof ByteBufAllocatorMetricProvider) {
ByteBufAllocatorMetric metrics = ((ByteBufAllocatorMetricProvider) alloc).metric();
PolledMeter.using(registry)
.withId(registry.createId("zuul.nettybuffermem.live", "type", "heap"))
.monitorValue(metrics, ByteBufAllocatorMetric::usedHeapMemory);
PolledMeter.using(registry)
.withId(registry.createId("zuul.nettybuffermem.live", "type", "direct"))
.monitorValue(metrics, ByteBufAllocatorMetric::usedDirectMemory);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

可以看到,这里主要是绑定了future的回调关系,然后有一个函数setupServerBootstrap,我们知道Netty需要使用 ServerBootstrap 进行 端口绑定,那么重点关注这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private ChannelFuture setupServerBootstrap(
NamedSocketAddress listenAddress, ChannelInitializer<?> channelInitializer) throws InterruptedException {
ServerBootstrap serverBootstrap =
new ServerBootstrap().group(serverGroup.clientToProxyBossPool, serverGroup.clientToProxyWorkerPool);

LOG.info("Proxy listening with {}", serverGroup.channelType);
serverBootstrap.channel(serverGroup.channelType);

serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
serverBootstrap.childOption(ChannelOption.SO_LINGER, -1);
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);

// Apply transport specific socket options.
for (Map.Entry<ChannelOption<?>, ?> optionEntry : serverGroup.transportChannelOptions.entrySet()) {
serverBootstrap = serverBootstrap.option((ChannelOption) optionEntry.getKey(), optionEntry.getValue());
}

serverBootstrap.handler(new NewConnHandler());
serverBootstrap.childHandler(channelInitializer);
serverBootstrap.validate();

LOG.info("Binding to : {}", listenAddress);

if (MANUAL_DISCOVERY_STATUS.get()) {
// Flag status as UP just before binding to the port.
serverStatusManager.localStatus(InstanceInfo.InstanceStatus.UP);
}

// Bind and start to accept incoming connections.
ChannelFuture bindFuture = serverBootstrap.bind(listenAddress.unwrap());

try {
return bindFuture.sync();
} catch (Exception e) {
// sync() sneakily throws a checked Exception, but doesn't declare it. This can happen if there is a bind
// failure, which is typically an IOException. Just chain it and rethrow.
throw new RuntimeException("Failed to bind on addr " + listenAddress, e);
}
}

可以看到其中创建了一个serverBootstrap对象,然后对其进行了参数设置,这个正是Netty的启动类,是zuul2的核心。最后调用bindFuture.sync()进行同步等待。

我们已经发现了Zuul2是如何启动一个Netty服务的,那我们接下来去了解最为重要的这些Filter是如何工作的,我们在上文启动中已经发现一个很重要的对象 channelInitializer

1
serverBootstrap.childHandler(channelInitializer);

我们知道在Netty中,是将一系列的 Handler 聚合在一起并使用 Pipeline 执行,那么我们从这个 channelInitializer 入手去研究。

channelInitializer是作为参数传入setupServerBootstrap函数的,我们找到调用方,发现是个MapaddressesToInitializers中的key,再找到这个 Map 的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//定义
private final Map<NamedSocketAddress, ? extends ChannelInitializer<?>> addressesToInitializers;

//初始化,在Server的构造方法中构造,在Server被初始化时传入
public Server(
Registry registry,
ServerStatusManager serverStatusManager,
Map<NamedSocketAddress, ? extends ChannelInitializer<?>> addressesToInitializers,
ClientConnectionsShutdown clientConnectionsShutdown,
EventLoopGroupMetrics eventLoopGroupMetrics,
EventLoopConfig eventLoopConfig,
Thread jvmShutdownHook) {
this.registry = Objects.requireNonNull(registry);
this.addressesToInitializers = Collections.unmodifiableMap(new LinkedHashMap<>(addressesToInitializers;//初始化
this.serverStatusManager = Preconditions.checkNotNull(serverStatusManager, "serverStatusManager");
this.clientConnectionsShutdown =
Preconditions.checkNotNull(clientConnectionsShutdown, "clientConnectionsShutdown");
this.eventLoopConfig = Preconditions.checkNotNull(eventLoopConfig, "eventLoopConfig");
this.eventLoopGroupMetrics = Preconditions.checkNotNull(eventLoopGroupMetrics, "eventLoopGroupMetrics");
this.jvmShutdownHook = jvmShutdownHook;
}

接下来,找到初始化Server的地方,在BaseServerStartupinit方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Inject
public void init() throws Exception {
ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
clientConnectionsShutdown =
new ClientConnectionsShutdown(clientChannels, GlobalEventExecutor.INSTANCE, discoveryClient);

//构造addrsToChannelInitializers
addrsToChannelInitializers = chooseAddrsAndChannels(clientChannels);

server = new Server(
registry,
serverStatusManager,
addrsToChannelInitializers,
clientConnectionsShutdown,
eventLoopGroupMetrics,
eventLoopConfig);
}

可以发现,我们心心念念的addrsToChannelInitializers通过函数chooseAddrsAndChannels构造。

继续深入:

1
2
3
4
5
6
7
8
9
10
11
12
@ForOverride
protected Map<NamedSocketAddress, ChannelInitializer<?>> chooseAddrsAndChannels(ChannelGroup clientChannels) {
@SuppressWarnings("unchecked") // Channel init map has the wrong generics and we can't fix without api breakage.
Map<Integer, ChannelInitializer<?>> portMap =
(Map<Integer, ChannelInitializer<?>>) (Map) choosePortsAndChannels(clientChannels);
return Server.convertPortMap(portMap);
}

@Deprecated
protected Map<Integer, ChannelInitializer> choosePortsAndChannels(ChannelGroup clientChannels) {
throw new UnsupportedOperationException("unimplemented");
}

发现该方法是protected的,需要由子类实现,否则会报错。

image-20241005114647001

进入到ServerStartup中,可以发现,正是channelInitializer的构造逻辑,这里笔者为节省篇幅,只给出 HTTP 的示例,其他协议类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
switch (SERVER_TYPE) {
case HTTP:
channelConfig.set(
CommonChannelConfigKeys.allowProxyHeadersWhen,
StripUntrustedProxyHeadersHandler.AllowWhen.ALWAYS);
channelConfig.set(CommonChannelConfigKeys.preferProxyProtocolForClientIp, false);
channelConfig.set(CommonChannelConfigKeys.isSSlFromIntermediary, false);
channelConfig.set(CommonChannelConfigKeys.withProxyProtocol, false);

addrsToChannels.put(
new NamedSocketAddress("http", sockAddr),
new ZuulServerChannelInitializer(metricId, channelConfig, channelDependencies, clientChannels) {
@Override
protected void addHttp1Handlers(ChannelPipeline pipeline) {
super.addHttp1Handlers(pipeline);
pipeline.addLast(new HttpContentCompressor((CompressionOptions[]) null));
}
});
logAddrConfigured(sockAddr);
break;

case HTTP2:
case HTTP_MUTUAL_TLS:
case WEBSOCKET:
case SSE:
}

return Collections.unmodifiableMap(addrsToChannels);

ZuulServerChannelInitializer

我们可以发现 ChannelInitializer 其实是 ZuulServerChannelInitializer 对象。在initChannel中我们发现了

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
protected void initChannel(Channel ch) throws Exception {
// Configure our pipeline of ChannelHandlerS.
ChannelPipeline pipeline = ch.pipeline();

storeChannel(ch);
addTimeoutHandlers(pipeline);
addPassportHandler(pipeline);
addTcpRelatedHandlers(pipeline);
addHttp1Handlers(pipeline);
addHttpRelatedHandlers(pipeline);
addZuulHandlers(pipeline);
}

在前面的都比较简单都是一些标准的 Handler,最为重要是 addZuulHandlers(pipeline); 这个函数,我们继续深入。

1
2
3
4
5
6
7
protected void addZuulHandlers(final ChannelPipeline pipeline) {
pipeline.addLast("logger", nettyLogger);
pipeline.addLast(new ClientRequestReceiver(sessionContextDecorator));
pipeline.addLast(passportLoggingHandler);
addZuulFilterChainHandler(pipeline);
pipeline.addLast(new ClientResponseWriter(requestCompleteHandler, registry));
}

上面的都很容易看出来,是日志,Session之类的Handler,最为重要的是addZuulFilterChainHandler,此处增加了 ZuulFilter。

继续深入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected void addZuulFilterChainHandler(final ChannelPipeline pipeline) {
final ZuulFilter<HttpResponseMessage, HttpResponseMessage>[] responseFilters = getFilters(
new OutboundPassportStampingFilter(PassportState.FILTERS_OUTBOUND_START),
new OutboundPassportStampingFilter(PassportState.FILTERS_OUTBOUND_END));➊

// response filter chain
final ZuulFilterChainRunner<HttpResponseMessage> responseFilterChain =
getFilterChainRunner(responseFilters, filterUsageNotifier);

// endpoint | response filter chain
final FilterRunner<HttpRequestMessage, HttpResponseMessage> endPoint =
getEndpointRunner(responseFilterChain, filterUsageNotifier, filterLoader);

final ZuulFilter<HttpRequestMessage, HttpRequestMessage>[] requestFilters = getFilters(
new InboundPassportStampingFilter(PassportState.FILTERS_INBOUND_START),
new InboundPassportStampingFilter(PassportState.FILTERS_INBOUND_END));

// request filter chain | end point | response filter chain
final ZuulFilterChainRunner<HttpRequestMessage> requestFilterChain =
getFilterChainRunner(requestFilters, filterUsageNotifier, endPoint);

pipeline.addLast(new ZuulFilterChainHandler(requestFilterChain, responseFilterChain));
}

从 ➊ 深入可以看到:

1
2
3
4
5
6
7
8
9
10
11
12
public <T extends ZuulMessage> ZuulFilter<T, T>[] getFilters(ZuulFilter<T, T> start, ZuulFilter<T, T> stop) {
final SortedSet<ZuulFilter<?, ?>> zuulFilters = filterLoader.getFiltersByType(start.filterType());
final ZuulFilter<T, T>[] filters = new ZuulFilter[zuulFilters.size() + 2];
filters[0] = start;
int i = 1;
for (ZuulFilter<?, ?> filter : zuulFilters) {
// TODO(carl-mastrangelo): find some way to make this cast not needed.
filters[i++] = (ZuulFilter<T, T>) filter;
}
filters[filters.length - 1] = stop;
return filters;
}

这里返回了一个 ZuulFilter 的数组,开始分别是 startstop 对应的刚好是 OutboundPassportStampingFilter

然我们继续回到 addZuulFilterChainHandler() 函数上来,我们发现有三段相似的代码正好对应着获得了 InBound OutBond EndPoint 这三种Filter,在代码我们可以看出顺序是

  1. requestFiltersendPointFilters 合并成 requestFilterChain
  2. responseFilters 构建成 responseFilterChain
  3. requestFilterChainresponseFilterChain 组合成 ZuulFilterChainHandler
  4. ZuulFilterChainHandler 添加至 pipeline

那这里我们还有一个疑问,这些Filter是从何而来的?这个答案隐藏在 com.netflix.zuul.DynamicFilterLoader.getFiltersByType 中,通过简单的跟踪我们可以得到

1
2
3
4
@Override
public Collection<ZuulFilter<?, ?>> getAllFilters() {
return Collections.unmodifiableList(new ArrayList<>(filters.values()));
}

在这里 获得所有的Fiter,而这里的Filter看起来是通过 put进来的,通过一个简单的断点,我们就可以发现:

1
2
3
4
@Override
public void put(String key, ZuulFilter<?, ?> filter) {
filters.putIfAbsent(Objects.requireNonNull(key, "key"), Objects.requireNonNull(filter, "filter"));
}

再深入,我们从init理一下调用逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@Inject
public void init() throws Exception {
if (!config.enabled) {
return;
}

long startTime = System.currentTimeMillis();
ThreadFactory tf = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("FilterFileManager_ProcessFiles-%d")
.build();
this.processFilesService = Executors.newFixedThreadPool(FILE_PROCESSOR_THREADS.get(), tf);

//静态
filterLoader.putFiltersForClasses(config.getClassNames());
//动态
manageFiles();
startPoller();

LOG.warn("Finished loading all zuul filters. Duration = {} ms.", (System.currentTimeMillis() - startTime));
}

void manageFiles() {
try {
List<File> aFiles = getFiles();
processGroovyFiles(aFiles);
} catch (Exception e) {
String msg = "Error updating groovy filters from disk!";
LOG.error(msg, e);
throw new RuntimeException(msg, e);
}
}

void processGroovyFiles(List<File> aFiles) throws Exception {

List<Callable<Boolean>> tasks = new ArrayList<>();
for (File file : aFiles) {
tasks.add(() -> {
try {
return filterLoader.putFilter(file);
} catch (Exception e) {
LOG.error("Error loading groovy filter from disk! file = {}", String.valueOf(file), e);
return false;
}
});
}
//定时任务动态扫描
processFilesService.invokeAll(tasks, FILE_PROCESSOR_TASKS_TIMEOUT_SECS.get(), TimeUnit.SECONDS);
}

@Override
public boolean putFilter(File file) {
if (!filterRegistry.isMutable()) {
return false;
}
try {
String sName = file.getAbsolutePath();
if (filterClassLastModified.get(sName) != null
&& (file.lastModified() != filterClassLastModified.get(sName))) {
LOG.debug("reloading filter {}", sName);
filterRegistry.remove(sName);
}
ZuulFilter<?, ?> filter = filterRegistry.get(sName);
if (filter == null) {
Class<?> clazz = compiler.compile(file);
if (!Modifier.isAbstract(clazz.getModifiers())) {
filter = filterFactory.newInstance(clazz);
putFilter(sName, filter, file.lastModified());
return true;
}
}
} catch (Exception e) {
LOG.error("Error loading filter! Continuing. file={}", file, e);
return false;
}

return false;
}

可以看到:我们通过类的全称限定类名获得的这个Fitler,这个配置是在我们的配置文件中配置的。

至此,我们已经明白了Zuul2如何将自己的 ZuulFilter 变换成 Netty Handler 并添加到 Netty Pipeline 之中的,那我们还剩下一个问题,这个 ZuulFilter 是如何运作的。但是我们在上段中,我们已经发现了最后是一个 ZuulFilterChainHandler 通过名称我们可以推测出,这是一个 Chain 链,我们继续往下探索吧。

ZuulFilterChainHandler

我们知道,最终注册到 Netty Pipeline 上的最终肯定是 Handler, 我们只需要从 Netty 的 channelRead() 函数作为突破口去阅读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpRequestMessage) { ➊
zuulRequest = (HttpRequestMessage) msg;

// Replace NETTY_SERVER_CHANNEL_HANDLER_CONTEXT in SessionContext
final SessionContext zuulCtx = zuulRequest.getContext();
zuulCtx.put(CommonContextKeys.NETTY_SERVER_CHANNEL_HANDLER_CONTEXT, ctx);

requestFilterChain.filter(zuulRequest);➋
} else if ((msg instanceof HttpContent) && (zuulRequest != null)) { ➌
requestFilterChain.filter(zuulRequest, (HttpContent) msg);
} else {
logger.debug(
"Received unrecognized message type. {}", msg.getClass().getName());
ReferenceCountUtil.release(msg);➍
}
}

➊ 这段逻辑处理 已经被转化为 HttpRequestMessage 类型的消息,➋ 是实际上的 filter 处理逻辑。

➌ 处理还没被转化为 HttpRequestMessage 类型的消息。

➍ 无法处理抛出异常,释放MSG。

而这里的 requestFilterChain 就是之前我们传入进去的 ZuulFilterChainRunner 我们看看这 filter() 函数做了什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//com.netflix.zuul.netty.filter.ZuulFilterChainRunner#filter
@Override
public void filter(final T inMesg) {
try (TaskCloseable ignored = PerfMark.traceTask(this, s -> s.getClass().getSimpleName() + ".filter")) {
addPerfMarkTags(inMesg);
runFilters(inMesg, initRunningFilterIndex(inMesg));
}
}

//com.netflix.zuul.netty.filter.ZuulFilterChainRunner#runFilters
private final void runFilters(final T mesg, final AtomicInteger runningFilterIdx) {
T inMesg = mesg;
String filterName = "-";
try {
Preconditions.checkNotNull(mesg, "Input message");
int i = runningFilterIdx.get(); ➊

while (i < filters.length) {
final ZuulFilter<T, T> filter = filters[i]; ➋
filterName = filter.filterName();
final T outMesg = filter(filter, inMesg); ➌
if (outMesg == null) {
return; //either async filter or waiting for the message body to be buffered
}
inMesg = outMesg;
i = runningFilterIdx.incrementAndGet(); ➍
}
invokeNextStage(inMesg); ➎
}
catch (Exception ex) {
}
}

➊ 获得当前运行的Filter的下标值,➋ 获得对应的 ZuulFilter,➌ 调用 ZuulFilter 进行处理,➍ 将下标志值 +1,继续循环体,➎ 执行下个阶段,这里对应着我们自己再构建 new InboundPassportStampingFilter(FILTERS_INBOUND_END)

通过这段代码,我们知道了 Zuul2 的Chain是由 ChainRunner运行。 我们继续看一下 BaseZuulFilterRunner 中的 filter() 函数。

忽略一些次要的代码,我们来看核心逻辑。这里的调用分为同步和异步

1
2
3
4
5
6
7
8
9
10
11
if (filter.getSyncType() == FilterSyncType.SYNC) {
final SyncZuulFilter<I, O> syncFilter = (SyncZuulFilter) filter;
final O outMesg;
try (TaskCloseable ignored2 = PerfMark.traceTask(filter, f -> f.filterName() + ".apply")) {
addPerfMarkTags(inMesg);
outMesg = syncFilter.apply(inMesg);//调用
}
recordFilterCompletion(ExecutionStatus.SUCCESS, filter, startTime, inMesg, snapshot);
//返回调用结果,为null则返回默认值
return (outMesg != null) ? outMesg : filter.getDefaultOutput(inMesg);
}

异步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
        // async filter
try (TaskCloseable ignored2 = PerfMark.traceTask(filter, f -> f.filterName() + ".applyAsync")) {
final Link nettyToSchedulerLink = PerfMark.linkOut();
filter.incrementConcurrency();
resumer = new FilterChainResumer(inMesg, filter, snapshot, startTime);
//异步
filter.applyAsync(inMesg)
//注册回调
.doOnSubscribe(() -> {
try (TaskCloseable ignored3 =
PerfMark.traceTask(filter, f -> f.filterName() + ".onSubscribeAsync")) {
PerfMark.linkIn(nettyToSchedulerLink);
}
})
.doOnNext(resumer.onNextStarted(nettyToSchedulerLink))
.doOnError(resumer.onErrorStarted(nettyToSchedulerLink))
.doOnCompleted(resumer.onCompletedStarted(nettyToSchedulerLink))
.observeOn(
Schedulers.from(getChannelHandlerContext(inMesg).executor()))//获取当前的所在的 EventExecutor 并在这个线程上观察
.doOnUnsubscribe(resumer::decrementConcurrency)
.subscribe(resumer);//将数据在 resumer 中消费
}

return null; // wait for the async filter to finish
} catch (Throwable t) {
if (resumer != null) {
resumer.decrementConcurrency();
}
final O outMesg = handleFilterException(inMesg, filter, t);
outMesg.finishBufferedBodyIfIncomplete();
recordFilterCompletion(ExecutionStatus.FAILED, filter, startTime, inMesg, snapshot);
return outMesg;
}
}

ProxyEndpoint

通过上面的一系列分析,我们已经知道的,Zuul的 调用链模型PreFilters的运行机制。但是,具体的远程调用在哪里体现呢?

在官网的Wiki中,我们也可以获得

Zuul does not use Ribbon for making outgoing connections and instead uses its own connection pool, using a Netty client. Zuul creates a connection pool per host, per event loop. It does this in order to reduce context switching between threads and to ensure sanity for both the inbound event loops and outbound event loops. The result is that the entire request is run on the same thread, regardless of which event loop is running it.

我们从 Wiki 中可以得知,Netty不再默认使用 Ribbon 而是默认使用 Netty 作为一个 Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//com.zuul.netty.filter.ZuulEndPointRunner#getEndpoint
protected ZuulFilter<HttpRequestMessage, HttpResponseMessage> getEndpoint(
final String endpointName, final HttpRequestMessage zuulRequest) {

//...
if (PROXY_ENDPOINT_FILTER_NAME.equals(endpointName)) {
return newProxyEndpoint(zuulRequest);
}
//...

return filter;
}

//com.zuul.netty.filter.ZuulEndPointRunner#newProxyEndpoint
protected ZuulFilter<HttpRequestMessage, HttpResponseMessage> newProxyEndpoint(HttpRequestMessage zuulRequest) {
return new ProxyEndpoint(
zuulRequest, getChannelHandlerContext(zuulRequest), getNextStage(), MethodBinding.NO_OP_BINDING);
}

具体逻辑在ProxyEndpoint这个类中

我们从filter的核心逻辑 apply看起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public HttpResponseMessage apply(final HttpRequestMessage input) {
// If no Origin has been selected, then just return a 404 static response.
// handle any exception here
try {

if (origin == null) {
handleNoOriginSelected();
return null;
}

origin.onRequestExecutionStart(zuulRequest);
proxyRequestToOrigin();➊

// Doesn't return origin response to caller, calls invokeNext() internally in response filter chain
return null;
} catch (Exception ex) {
handleError(ex);
return null;
}
}

➊ 将请求转发至远端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void proxyRequestToOrigin() {
Promise<PooledConnection> promise = null;
try {
//...
promise = origin.connectToOrigin(
zuulRequest, channelCtx.channel().eventLoop(), attemptNum, passport, chosenServer, chosenHostAddr);➊
//...

if (promise.isDone()) {
operationComplete(promise);➋
} else {
promise.addListener(this);
}
} catch (Exception ex) {
//...
}
}

➊ 处将请求包装,连接到远端地址,获得 Promise ➋ 结束的 Promise 处理,在 operationComplete() 中包含了成功的执行代码,至于 connectToOrigin Zuul 包装了 Netty的Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void writeClientRequestToOrigin(final PooledConnection conn, Duration readTimeout) {
final Channel ch = conn.getChannel();➊
passport.setOnChannel(ch);

// set read timeout on origin channel
ch.attr(ClientTimeoutHandler.ORIGIN_RESPONSE_READ_TIMEOUT).set(readTimeout);

context.put(CommonContextKeys.ORIGIN_CHANNEL, ch);
context.set(POOLED_ORIGIN_CONNECTION_KEY, conn);

preWriteToOrigin(chosenServer.get(), zuulRequest);

final ChannelPipeline pipeline = ch.pipeline();
originResponseReceiver = getOriginResponseReceiver();
pipeline.addBefore(
DefaultOriginChannelInitializer.CONNECTION_POOL_HANDLER,
OriginResponseReceiver.CHANNEL_HANDLER_NAME,
originResponseReceiver);

ch.write(zuulRequest);➋
writeBufferedBodyContent(zuulRequest, ch);
ch.flush();➌

// Get ready to read origin's response
syncClientAndOriginChannels(channelCtx.channel(), ch);
ch.read();➍

originConn = conn;
channelCtx.read();➎
}

➊ 获得建立的连接 ➋ 写入Zuul的请求,也就是用户的请求 ➌ 将消息Flush出去 ➍ 在这里读取响应的数据,也就是触发 OutBoundHandler 的处理时间。

之后就继续调用 response filter chain了。

总结

Zuul整体逻辑,我们通过博文可以分析而出。

  1. ZuulFilter 分为 Inbound, Outbound, EndPoint
  2. Inbound, Outbound, EndPoint 包裹成 ChainRunner
  3. ChainRunner 组合成一个 ZuulFilterChainHandler,而 ZuulFilterChainHandler 是Netty的 一个Handler
  4. ZuulFilterChainHandler 会组装到 Netty 的 Pipeline 中,剩下来就是Netty的流程。

参考