阅读完需:约 105 分钟
Netty 是由 JBOSS 提供的一个 Java 开源通讯框架,用以快速开发高性能、高可靠性的网络 IO 程序。它底层很好地封装了 Socket,处理网络通信的一个开源通信框架。通俗说明,Netty 就是解决两个系统之间互相通信的一个框架。
在 Java 领域 Netty 运用非常地广泛,Tomcat、Dubbo、RocketMQ、Zookeeper、Spark、Flink、ElasticSearch
等等这些中间件的网络通讯框架都是基于 Netty 去实现的。
其中 Netty 只是对 Java 原生的通信框架进行了高度的封装
在 Java 语言当中 Netty 基本上是一支独大,涉及到网络通信基本上都是使用 Netty,很少去直接使用 NIO 或者其它类似的第三方框架,目前和 Netty 同一级别的框架主要是 mina,其实 mina 和 Netty 都是同一个人开发的,Netty 比 mina 更晚出现,因此推荐 Netty。
相比 mina 那么 Netty 的优势是什么呢?
- Mina 将内核和一些特性的联系过于紧密,使得用户在不需要这些特性的时候无法脱离,相比之下性能会有所下降,Netty 解决了这个设计问题;
- Netty 的文档更清晰,很多 Mina 的特性在 Netty 里都有;
- Netty 比 Mina 使用起来更简单,如果上手只需要掌握模板代码 + 自定义 Handler 即可;
- 它们的架构差别不大,Mina 靠 apache 生存,而 Netty 靠 jboss。Netty 有对 google protocal buf 的支持,有更完整的 IOC 容器支持。
NIO 的类库和 API 繁杂, 使用麻烦: 需要熟练掌握Selector、 ServerSocketChannel、 SocketChannel、 ByteBuffer
等。
开发工作量和难度都非常大: 例如客户端面临断线重连、 网络闪断、心跳处理、半包读写、 网络拥塞和异常流的处理等等。
Netty 对 JDK 自带的 NIO 的 API 进行了良好的封装,解决了上述问题。且Netty拥有高性能、 吞吐量更高,延迟更 低,减少资源消耗,最小化不必要的内存复制等优点。 Netty 现在都在用的是4.x,5.x版本已经废弃,Netty 4.x 需要JDK 6以上版本支持
Netty示例
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.35.Final</version>
</dependency>
服务端代码
public class NettyServer {
public static void main(String[] args) throws Exception {
// 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用链式编程来配置参数
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
// 使用NioServerSocketChannel作为服务器的通道实现
.channel(NioServerSocketChannel.class)
// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//对workerGroup的SocketChannel设置处理器
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start。。");
// 绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
// 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
ChannelFuture cf = bootstrap.bind(9000).sync();
// 给cf注册监听器,监听我们关心的事件
/*cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口9000成功");
} else {
System.out.println("监听端口9000失败");
}
}
});*/
// 等待服务端监听端口关闭,closeFuture是异步操作
// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* 自定义Handler需要继承netty规定好的某个HandlerAdapter(规范)
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端连接服务器完成就会触发该方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("客户端连接通道建立完成");
}
/**
* 读取客户端发送的数据
*
* @param ctx 上下文对象, 含有通道channel,管道pipeline
* @param msg 就是客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//Channel channel = ctx.channel();
//ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
//将 msg 转成一个 ByteBuf,类似NIO 的 ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到客户端的消息:" + buf.toString(CharsetUtil.UTF_8));
}
/**
* 数据读取完毕处理方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
/**
* 处理异常, 一般是需要关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
客户端代码
public class NettyClient {
public static void main(String[] args) throws Exception {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是ServerBootstrap而是Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入处理器
ch.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("netty client start。。");
//启动客户端去连接服务器端
ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();
//对通道关闭进行监听
cf.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端连接服务器完成就会触发该方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
//当通道有读取事件时会触发,即服务端发送数据给客户端
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
上面是Netty的例子,看完代码,发现Netty框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来,让你可以专注业务的开 发,而不需写一大堆类似NIO的网络处理操作。
Netty线程模型
模型解释:
- Netty 抽象出两组线程池
BossGroup
和WorkerGroup
,BossGroup
专门负责接收客户端的连接,WorkerGroup
专门负责网络的读写 -
BossGroup
和WorkerGroup
类型都是NioEventLoopGroup
-
NioEventLoopGroup
相当于一个事件循环线程组, 这个组中含有多个事件循环线程 , 每一个事件循环线程是NioEventLoop
- 每个
NioEventLoop
都有一个selector
, 用于监听注册在其上的socketChannel
的网络通讯 - 每个
Boss NioEventLoop
线程内部循环执行的步骤有 3 步- 处理
accept
事件 , 与client
建立连接 , 生成NioSocketChannel
- 将
NioSocketChannel
注册到某个worker NIOEventLoop
上的selector
- 处理任务队列的任务 , 即
runAllTasks
- 处理
- 每个
worker NIOEventLoop
线程循环执行的步骤- 轮询注册到自己
selector
上的所有NioSocketChannel
的read, write
事件 - 处理 I/O 事件, 即
read , write
事件, 在对应NioSocketChannel
处理业务 -
runAllTasks
处理任务队列TaskQueue
的任务 ,一些耗时的业务处理一般可以放入TaskQueue
中慢慢处 理,这样不影响数据在pipeline
中的流动处理
- 轮询注册到自己
- 每个
worker NIOEventLoop
处理NioSocketChannel
业务时,会使用pipeline
(管道),管道中维护了很多handler
处理器用来处理channel
中的数据
Netty模块组件
Bootstrap、ServerBootstrap
意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组 件,Netty 中 Bootstrap
类是客户端程序的启动引导类,ServerBootstrap
是服务端启动引导类。
Future、ChannelFuture
、Promise
在 Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。 但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future
和 ChannelFutures
,Promise
,他们可以注 册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。
ChannelFuture
public class MyClient {
public static void main(String[] args) throws IOException, InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
// 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程
// NIO线程:NioEventLoop 中的线程
.connect(new InetSocketAddress("localhost", 8080));
// 该方法用于等待连接真正建立
channelFuture.sync();
// 获取客户端-服务器之间的Channel对象
Channel channel = channelFuture.channel();
channel.writeAndFlush("hello world");
System.in.read();
}
}
如果去掉channelFuture.sync()
方法,会服务器无法收到hello world
这是因为建立连接(connect)的过程是异步非阻塞的,若不通过sync()
方法阻塞主线程,等待连接真正建立,这时通过 channelFuture.channel()
拿到的 Channel 对象,并不是真正与服务器建立好连接的 Channel,也就没法将信息正确的传输给服务器端
所以需要通过channelFuture.sync()
方法,阻塞主线程,同步处理结果,等待连接真正建立好以后,再去获得 Channel 传递数据。使用该方法,获取 Channel 和发送数据的线程都是主线程
addListener方法
用于异步获取建立连接后的 Channel 和发送数据,通过这种方法可以在NIO线程中获取 Channel 并发送数据,而不是在主线程中执行这些操作
public class MyClient {
public static void main(String[] args) throws IOException, InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
// 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程
// NIO线程:NioEventLoop 中的线程
.connect(new InetSocketAddress("localhost", 8080));
// 当connect方法执行完毕后,也就是连接真正建立后
// 会在NIO线程中调用operationComplete方法
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
Channel channel = channelFuture.channel();
channel.writeAndFlush("hello world");
}
});
System.in.read();
}
}
处理关闭
public class ReadClient {
public static void main(String[] args) throws InterruptedException {
// 创建EventLoopGroup,使用完毕后关闭
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
channelFuture.sync();
Channel channel = channelFuture.channel();
Scanner scanner = new Scanner(System.in);
// 创建一个线程用于输入并向服务器发送
new Thread(()->{
while (true) {
String msg = scanner.next();
if ("q".equals(msg)) {
// 关闭操作是异步的,在NIO线程中执行
channel.close();
break;
}
channel.writeAndFlush(msg);
}
}, "inputThread").start();
// 获得closeFuture对象
ChannelFuture closeFuture = channel.closeFuture();
System.out.println("waiting close...");
// 同步等待NIO线程执行完close操作
closeFuture.sync();
// 关闭之后执行一些操作,可以保证执行的操作一定是在channel关闭以后执行的
System.out.println("关闭之后执行一些额外操作...");
// 关闭EventLoopGroup
group.shutdownGracefully();
}
}
当我们要关闭channel时,可以调用channel.close()
方法进行关闭。但是该方法也是一个异步方法。真正的关闭操作并不是在调用该方法的线程中执行的,而是在NIO线程中执行真正的关闭操作
如果我们想在channel真正关闭以后,执行一些额外的操作,可以选择以下两种方法来实现
- 通过
channel.closeFuture()
方法获得对应的ChannelFuture
对象,然后调用sync()方法阻塞执行操作的线程,等待channel真正关闭后,再执行其他操作
// 获得closeFuture对象
ChannelFuture closeFuture = channel.closeFuture();
// 同步等待NIO线程执行完close操作
closeFuture.sync();
- 调用closeFuture.addListener方法,添加close的后续操作
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
// 等待channel关闭后才执行的操作
System.out.println("关闭之后执行一些额外操作...");
// 关闭EventLoopGroup
group.shutdownGracefully();
}
});
netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
Future
public class NettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
// 获得 EventLoop 对象
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 50;
}
});
// 主线程中获取结果
System.out.println(Thread.currentThread().getName() + " 获取结果");
System.out.println("getNow " + future.getNow());
System.out.println("get " + future.get());
// NIO线程中异步获取结果
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
System.out.println(Thread.currentThread().getName() + " 获取结果");
System.out.println("getNow " + future.getNow());
}
});
}
}
Netty中的Future对象,可以通过EventLoop的sumbit()方法得到
- 可以通过Future对象的get方法,阻塞地获取返回结果
- 也可以通过getNow方法,获取结果,若还没有结果,则返回null,该方法是非阻塞的
- 还可以通过future.addListener方法,在Callable方法执行的线程中,异步获取返回结果
Promise
Promise相当于一个容器,可以用于存放各个线程中的结果,然后让其他线程去获取该结果
public class NettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建EventLoop
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
// 创建Promise对象,用于存放结果
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 自定义线程向Promise中存放结果
promise.setSuccess(50);
}).start();
// 主线程从Promise中获取结果
System.out.println(Thread.currentThread().getName() + " " + promise.get());
}
}
promise 与 future 的区别——future执行完后发送结果,promise中途发送结果(Netty中有太多的场景用promise,因为可以进行异步任务管理,结果的管理)
promise
完整案例
public class NettyPromiseDemo {
public static final int SLEEP_GAP = 5000;
public static String getCurThreadName() {
return Thread.currentThread().getName();
}
static class HotWaterJob implements Runnable //①
{
private final Promise<Object> hotPromise;
public HotWaterJob(Promise<Object> hotPromise) {
this.hotPromise = hotPromise;
}
@Override
public void run() //②
{
try {
Logger.info("开始烧水");
Logger.info("灌上凉水");
Logger.info("放在火上");
//线程睡眠一段时间,代表烧水中
Thread.sleep(SLEEP_GAP);
Logger.info("水开了");
hotPromise.trySuccess(true);
// 发送完成后,继续任务
System.out.println("发送完成后,继续任务");
} catch (InterruptedException e) {
Logger.info(" 发生异常被中断.");
hotPromise.trySuccess(false);
}
Logger.info(" 烧水工作,运行结束.");
}
}
static class WashJob implements Callable<Boolean> {
private final Promise<Object> washPromise;
public WashJob(Promise<Object> washPromise) {
this.washPromise = washPromise;
}
@Override
public Boolean call() throws Exception {
try {
Logger.info("洗茶壶");
Logger.info("洗茶杯");
Logger.info("拿茶叶");
//线程睡眠一段时间,代表清洗中
Thread.sleep(SLEEP_GAP/5);
Logger.info("洗完了");
} catch (InterruptedException e) {
Logger.info(" 清洗工作 发生异常被中断.");
washPromise.trySuccess(true);
return false;
}
Logger.info(" 清洗工作 运行结束.");
washPromise.trySuccess(true);
return true;
}
}
//泡茶线程
static class MainJob implements Runnable {
volatile boolean waterOk = false;
volatile boolean cupOk = false;
int gap = SLEEP_GAP / 10;
@Override
public void run() {
while (true) {
try {
Thread.sleep(gap);
Logger.info("读书中......");
} catch (InterruptedException e) {
Logger.info(getCurThreadName() + "发生异常被中断.");
}
}
}
public void drinkTea() {
if (waterOk && cupOk) {
Logger.info("泡茶喝,茶喝完");
this.waterOk = false;
this.gap = SLEEP_GAP * 100;
} else if (!waterOk) {
Logger.info("烧水 没有完成,没有茶喝了");
} else if (!cupOk) {
Logger.info("洗杯子 没有完成,没有茶喝了");
}
}
}
public static void main(String args[]) {
//新起一个线程,作为泡茶主线程
MainJob mainJob = new MainJob();
// Thread mainThread = new Thread(mainJob);
// mainThread.setName("喝茶线程");
// mainThread.start();
//创建netty 线程池
DefaultEventExecutorGroup npool = new DefaultEventExecutorGroup(2);
Promise<Object> hotPromise = npool.next().newPromise();
//烧水的业务逻辑
Runnable hotJob = new HotWaterJob(hotPromise);
//绑定任务执行完成后的回调,到异步任务
hotPromise.addListener(new GenericFutureListener() {
@Override
public void operationComplete(io.netty.util.concurrent.Future future) throws Exception {
if (future.isSuccess()) {
mainJob.waterOk = true;
Logger.info("烧水 完成,尝试着去吃吃茶!");
mainJob.drinkTea();
} else {
mainJob.waterOk = false;
Logger.info("烧水 失败啦!");
}
}
});
//绑定任务执行完成后的回调,到异步任务
Promise<Object> washPromise = npool.next().newPromise();
//清洗的业务逻辑
Callable<Boolean> washJob = new WashJob(washPromise);
washPromise.addListener(new GenericFutureListener() {
@Override
public void operationComplete(io.netty.util.concurrent.Future future) throws Exception {
if (future.isSuccess()) {
mainJob.cupOk = true;
Logger.info("杯子洗 完成,尝试着去吃吃茶!");
mainJob.drinkTea();
} else {
mainJob.cupOk = false;
Logger.info("杯子洗不了,没有茶喝了");
}
}
});
//提交烧水的业务逻辑,取到异步任务
io.netty.util.concurrent.Future hotFuture = npool.submit(hotJob);
//提交清洗的业务逻辑,取到异步任务
io.netty.util.concurrent.Future<Boolean> washFuture = npool.submit(washJob);
}
}
在这个例子里,可以很明显的看出来Promise的作用,可以在异步任务还没结束的时候向外传递信息,原本的Future是需要返回结果的需要实现Callable接口,但是如果使用Promise那么可以不需要Callable接口,用Runnable接口也是可以的,因为结果已经不需要任务结束才返回了。
Channel
Netty 网络通信的组件,能够用于执行网络 I/O 操作。Channel 为用户提供:
- 当前网络连接的通道的状态(例如是否打开?是否已连接?)
- 网络连接的配置参数 (例如接收缓冲区大小)
- 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保 证在调用结束时所请求的 I/O 操作已完成。
- 调用立即返回一个
ChannelFuture
实例,通过注册监听器到ChannelFuture
上,可以 I/O 操作成功、失败或取 消时回调通知调用方。 - 支持关联 I/O 操作与对应的处理程序。 不同协议、不同的阻塞类型的连接都有不同的
Channel
类型与之对应。
常用的 Channel 类型:
-
NioSocketChannel
,异步的客户端 TCP Socket 连接。 -
NioServerSocketChannel
,异步的服务器端 TCP Socket 连接。 -
NioDatagramChannel
,异步的 UDP 连接。 -
NioSctpChannel
,异步的客户端 Sctp 连接。 -
NioSctpServerChannel
,异步的 Sctp 服务器端连接。
这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。
Selector
Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。 当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是 否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多 个 Channel 。
NioEventLoop
NioEventLoop
中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用 NioEventLoop
的 run 方 法,执行 I/O 任务和非 I/O 任务:
I/O 任务,即 selectionKey
中 ready
的事件,如 accept、connect、read、write
等,由 processSelectedKeys
方法触发。 非 IO 任务,添加到 taskQueue
中的任务,如 register0、bind0
等任务,由 runAllTasks
方法触发。
NioEventLoopGroup
NioEventLoopGroup
,主要管理 eventLoop
的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程 (NioEventLoop
)负责处理多个 Channel
上的事件,而一个 Channel
只对应于一个线程。
除了NioEventLoopGroup
还有 EpollEventLoopGroup
,DefaultEventExecutorGroup
等 —— EpollEventLoopGroup
其实就是在linux环境下更高效的 NioEventLoopGroup
DefaultEventExecutorGroup
其实就是Netty的普通的线程池
其实NioEventLoopGroup
也是线程池,因为底层也是实现了Executor
接口,并且是可以执行定时任务的线程池,最最关键的是可以执行IO事件,这是与普通的线程池最大区别
线程池之间的关系
搞清楚DefaultEventExecutorGroup
,ThreadPoolExecutor
,Thread
的关系
DefaultEventExecutorGroup
与ThreadPoolExecutor
,本质都是一样的都是线程池,都是实现了Executor
接口利用execute()
方法来最后执行Thread.start()
所谓的线程池本质都是每个线程Thread.start()
来运行线程—都是通过这个threadFactory
来创建与调用线程(threadFactory.newThread(command).start())
可以多插一句
Future
,Callable
,Runnable
的关系
Callable,Runnable
,都是一样的其实就是放入我们要执行的内容给Executor的execute()方法来执行,而Future只是提供了线程的一些额外方法(比如获取异步结果)
注意的是—DefaultEventExecutorGroup
后面调用的是SingleThreadEventExecutor
里的方法很关键,里面有CAS与BlockingQueue
ChannelHandler
ChannelHandler
是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline
(业务处理链)中的 下一个处理程序。 ChannelHandler
本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类:
-
ChannelInboundHandler
用于处理入站 I/O 事件。 -
ChannelOutboundHandler
用于处理出站 I/O 操作。
或者使用以下适配器类:
-
ChannelInboundHandlerAdapter
用于处理入站 I/O 事件。 -
ChannelOutboundHandlerAdapter
用于处理出站 I/O 操作。
ChannelHandlerContext
保存 Channel
相关的所有上下文信息,同时关联一个 ChannelHandler
对象。
ChannelPipline
保存 ChannelHandler
的 List,用于处理或拦截 Channel
的入站事件和出站操作。 ChannelPipeline
实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel
中各 个的 ChannelHandler
如何相互交互。
在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline
与之对应,它们的组成关系如下:
一个 Channel
包含了一个 ChannelPipeline
,而 ChannelPipeline
中又维护了一个由 ChannelHandlerContext
组 成的双向链表,并且每个 ChannelHandlerContext
中又关联着一个 ChannelHandler
。
read
事件(入站事件)和write
事件(出站事件)在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰。
ChannelOption
ChannelOption
提供了一组可以在通道创建或绑定之前使用的选项,用于设置通道的特定属性,如连接超时时间、启用/禁用广播、启用/禁用KeepAlive等。
在传统的BIO的编程中, 我们创建了一个ServerSocket
后, 可以进行一些配置, 如下:
ServerSocket serverSocket = new ServerSocket( 8080 );
serverSocket.setReceiveBufferSize( 1024 * 64 );
serverSocket.setSoTimeout( 3000 );
ReceiveBufferSize
表示接收缓冲区的大小,Timeout
为超时时间
到了Nio中, 引入了Channel, ServerSocket
对应的就是ServerSocketChannel
, 我们不会再去创建ServerSocket
那么还想对套接字进行上述配置应该怎么处理?在ServerSocketChannel
中就有setOption
方法来进行设置
回到Netty中, 大家在日常使用中, 貌似很少会直接用Nio中的ServerSocketChannel
了, 因为Netty又进一步进行了封装, 比如说新增了一个NioServerSocketChannel
, 里面有两个属性, 一个是Nio中的ServerSocketChannel
, 一个是NioServerSocketChannelConfig
, 很清晰, Netty利用NioServerSocketChannel
保存了ServerSocketChannel
及其对应的配置NioServerSocketChannelConfig
, 既然是Config自然就会有很多的key-value来表示配置信息,比如说: private Map<String, Object> config
;我要设置receiveBufferSize
, 那么我就要config.put( "receiveBufferSize", 1024 );
这有一个弊端, 让使用者需要手动输入字符串key, 而且, 如果key需要有些特殊的属性等信息, 也没法实现, 这些key通常是固定的, 也就是我们所说的常量, 于是Netty为了能够让key拥有更多的功能, 而不是仅仅用字符串来表示, 就出现了ChannelOption
可以简单分析一下这几个类:
Constant
是一个常量接口, 定义了id和name方法
AbstractConstant
对Constant
中的id和name方法进行了实现, 其实就等于get方法而已, 可以看到, 每一个常量对象都有一个name, 这个name就是这个常量对象的字符串表示形式, 更加具体一点, 之前我们设置到ServerSocketChanne
中的receiveBufferSize
这个配置, 就对应一个常量对象, 常量对象的name就是receiveBufferSize
, id是一个随机生成的值, 用来表示唯一性
分析到了AbstractConstant
, 抽象类, 定义了所有情况下的常量的公共信息id和name再往后, 要配置Channel
, 那么就出现了ChannelOption
, 如果要配置数据库就可能会出现DatabaseOption
(实际上并没有),于是ChannelOption
表示用于Channel
配置信息中的key
, 我们看到这个ChannelOption
的源码, 一开始出现了一个pool,池子, 这个是核心常量池, 这个池子的作用是用来保存常量的, 假设我有许多的Channel配置, 那么我自然就需要有许多的ChannelOption
, 由于配置的key一定是固定的, 那么在使用的时候, 如果要更改配置信息, 总不能每次都new一个ChannelOption
来表示配置信息的key吧, 于是, 这些key就被保存到了常量
池中, ConstantPool
就是用来保存这些配置对应的key的, ConstantPool
是一个抽象类, 里面就一个ConcurrentMap
来保存, ConcurrentMap
的key一定是字符串, value是泛型。
父类的ConcurrentMap
是公共需要的, 但是ConcurrentMap
里面存的值却需要子类来定义, 同时实现newConstant
方法表示这个池子中常量的创建(在ChannelOption
类中实现)
上面是存储常量的过程,constants
就是那个ConcurrentMap
, 通过key为SO_RCVBUF
去这个map查找对应的ChannelOption
如果没找到, 那么就调用newConstant
创建一个该常量, 并且放入到ConcurrentMap
中, newConstant
由子类来觉得创建的是哪个类型的常量, ChannelOption
有一个匿名内部类是ConstantPool
的子类, 之前我们也看过了,其创建的是ChannelOption
常用通道选项
- SO_BACKLOG
用于指定服务端连接队列长度,当服务器连接处理线程全忙时,已完成三次握手的请求会被临时存放在连接队列中等待被accept,队列满后会拒绝新收到的连接请求
如不设置,该值默认为200左右,对于连接数不太多的场景,默认值就够了,像常见RPC框架的服务端(如dubbo)就没有设置,RocketMq需要支持更高频的连接请求,所以使用了推荐值1024
- SO_REUSEADDR
TCP四次挥手的最后阶段,主动发起关闭的一端会处于TIME_WAIT
状态,该状态下的socket所用端口无法被复用(默认时间2MSL=4分钟);
在服务端客户端架构中,通常是服务端主动发起连接关闭,在大量连接的场景中,无论是频繁关闭连接和新建连接,还是服务端重启,都需要端口资源,4分钟太长了,不能忍。
SO_REUSEADDR=true
就是通知内核,如果端口忙,但socket状态是TIME_WAIT
,可以立即重用端口;因为端口资源限制,该配置算是服务端必备的了
- SO_KEEPALIVE
TCP有内置的连接保活机制,保活并不是把挂掉的连接整活,而是及时发现并释放无效连接资源,只留下活跃的连接
由于netty提供的IdleStateHandler
可以非常方便、灵活的实现心跳维持和会话管理,所以一般不用TCP自带的KEEP_ALIVE
,这里就设置为false了
- TCP_NODELAY
TCP/IP协议中针对TCP默认开启了Nagle算法,会对包的发送进行限制,至少满足下面任意一个条件才可以发出
- 写缓冲区的字节数超过指定阈值
- 之前发出的所有包都已收到ack
其好处是减少网络开销和发送接收两端的压力,坏处就是存在发送延时,对于延时敏感型、数据传输量比较小的应用,应该选择关闭Nagle算法,关闭方式为设置TCP_NODELAY=true
- SO_SNDBUF 与 SO_RCVBUF
每个socket都有一个receive缓冲区和send缓冲区,在调用socket的read send时,其实仅仅是操作缓冲区
read仅仅从receive缓冲区拿数据,拿不到就阻塞等待对端数据从网络过来。send仅仅将数据放入send缓冲区,缓冲区满了就等待
对于TCP协议来说,由于有滑动窗口控制,如果接收端read缓冲区已满,还未来的及处理,则会在回复发送端的ack中缩小窗口值,提示发送端降低发送速度,甚至暂时不发,缓解自身的压力
所以说缓冲区的大小影响的是两端发送接收的速度和压力,没有一个标准的参考值,实际生产时应该由测试结果决定,通常情况下,默认的值就够了,也就是不需要设置
- CONNECT_TIMEOUT_MILLS
连接超时时间,超过该时间还没连上服务端,则抛出timeout异常,也算是客户端的标配了由于客户端只与服务端建立少量的连接,所以不需要设SO_REUSEADDR
在设置时有两种方式
option(ChannelOption.
SO_BACKLOG
, 1024)
childOption(ChannelOption.TCP_NODELAY
,true)
childOption()
都是给workerGroup
option()
都是给bossGroup
option()
和handler()
是在server
启动时进行设置和调用,childHandler()
和childOption()
是在连接建立是设置和调用的
ByteBuf
从结构上来说,ByteBuf 由一串字节数组构成。数组中每个字节用来存放信息。 ByteBuf 提供了两个索引,一个用于读取数据,一个用于写入数据。这两个索引通过在字节数组中移动,来定位需要读或者写信息的位置。
当从 ByteBuf 读取时,它的 readerIndex
(读索引)将会根据读取的字节数递增。 同样,当写 ByteBuf 时,它的 writerIndex
也会根据写入的字节数进行递增。
代码示例
public class NettyByteBuf {
public static void main(String[] args) {
// 创建byteBuf对象,该对象内部包含一个字节数组byte[10]
// 通过readerindex和writerIndex和capacity,将buffer分成三个区域
// 已经读取的区域:[0,readerindex)
// 可读取的区域:[readerindex,writerIndex)
// 可写的区域: [writerIndex,capacity)
ByteBuf byteBuf = Unpooled.buffer(10);
System.out.println("byteBuf=" + byteBuf);
for (int i = 0; i < 8; i++) {
byteBuf.writeByte(i);
}
System.out.println("byteBuf=" + byteBuf);
for (int i = 0; i < 5; i++) {
System.out.println(byteBuf.getByte(i));
}
System.out.println("byteBuf=" + byteBuf);
for (int i = 0; i < 5; i++) {
System.out.println(byteBuf.readByte());
}
System.out.println("byteBuf=" + byteBuf);
//用Unpooled工具类创建ByteBuf
ByteBuf byteBuf2 = Unpooled.copiedBuffer("hello,zhuge!", CharsetUtil.UTF_8);
//使用相关的方法
if (byteBuf2.hasArray()) {
byte[] content = byteBuf2.array();
//将 content 转成字符串
System.out.println(new String(content, CharsetUtil.UTF_8));
System.out.println("byteBuf=" + byteBuf2);
System.out.println(byteBuf2.readerIndex()); // 0
System.out.println(byteBuf2.writerIndex()); // 12
System.out.println(byteBuf2.capacity()); // 36
System.out.println(byteBuf2.getByte(0)); // 获取数组0这个位置的字符h的ascii码,h=104
int len = byteBuf2.readableBytes(); //可读的字节数 12
System.out.println("len=" + len);
//使用for取出各个字节
for (int i = 0; i < len; i++) {
System.out.println((char) byteBuf2.getByte(i));
}
//范围读取
System.out.println(byteBuf2.getCharSequence(0, 6, CharsetUtil.UTF_8));
System.out.println(byteBuf2.getCharSequence(6, 6, CharsetUtil.UTF_8));
}
}
}
其中Unpooled
只是一个工具类,本质上与ByteBufAllocator
没有区别
直接内存与堆内存
直接内存(Direct Memory)并不是虚拟机运行时数据区的一部分,也不是Java虚拟机规范中定义的内存区域,某些情况下这部分内存也 会被频繁地使用,而且也可能导致OutOfMemoryError
异常出现。Java里用DirectByteBuffer
可以分配一块直接内存(堆外内存),元空间对应的内存也叫作直接内存,它们对应的都是机器的物理内存。
通过该方法创建的ByteBuf,使用的是基于直接内存的ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
可以使用下面的代码来创建池化基于堆的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);
也可以使用下面的代码来创建池化基于直接内存的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
使用直接内存的优缺点:
优点:
- 不占用堆内存空间,减少了发生GC的可能
- java虚拟机实现上,本地IO会直接操作直接内存(直接内存=>系统调用=>硬盘/网卡),而非直接内存则需要二次拷贝(堆内 存=>直接内存=>系统调用=>硬盘/网卡)
缺点:
- 初始分配较慢
- 没有JVM直接帮助管理内存,容易发生内存溢出。为了避免一直没有FULL GC,最终导致直接内存把物理内存耗完。我们可以 指定直接内存的最大值,通过
-XX:MaxDirectMemorySize
来指定,当达到阈值的时候,调用system.gc
来进行一次FULL GC,间 接把那些没有被使用的直接内存回收掉。
ByteBuf内存池设计
随着JVM虚拟机和JIT即时编译技术的发展,对象的分配和回收是个非常轻量级的工作。但是对于缓冲区Buffer(相当于一个内存块),情况却稍有不同,特别是对于堆外直接内存的分配和回收,是一件耗时的操作。为了尽量重用缓冲区,Netty提供了基于ByteBuf内存池的缓冲区重用机制。需要的时候直接从池子里获取ByteBuf使用即可,使用完毕之后就重新放回到池子里去。
PooledByteBufAllocator
的newDirectBuffer
方法,从Cache
中获取内存区域PoolArena
,调用它的allocate
方法进行内存分配
PoolArena
的allocate
方法如下:
重点分析newByteBuf
的实现,它同样是个抽象方法,由子类DirectArena
和HeapArena
来实现不同类型的缓冲区分配
这里使用的是直接内存,因此重点分析DirectArena
的实现
最终执行了PooledUnsafeDirectByteBuf
的newInstance
方法
通过RECYCLER的get方法循环使用ByteBuf对象,如果是非内存池实现,则直接创建一个新的ByteBuf对象。
ByteBuf扩容机制
如果我们需要了解ByteBuf的扩容,我们需要先了解ByteBuf中定义的几个成员变量
-
minNewCapacity
:用户需要写入的值大小 -
threshold
:阈值,为Bytebuf内部设定容量的最大值 -
maxCapacity
:Netty最大能接受的容量大小,一般为int的最大值
核心扩容方法
进入Bytebuf的源码可以看到详细的扩容过程
Bytebuf(抽象类) -> writeByte(抽象方法) -> AbstractByteBuf(抽象类) -> writeByte(抽象方法) -> ensureWritable0(关键点) -> calculateNewCapacity(扩容方法) -> AbstractByteBufAllocator(抽象类)
- 判断目标值与阈值threshold(4MB)的大小关系,等于直接返回阈值
- 采用步进4MB的方式完成扩容
- 采用64为基数,做倍增的方式完成扩容
Netty的ByteBuf需要动态扩容来满足需要,扩容过程: 默认门限阈值为4MB(这个阈值是一个经验值,不同场景,可能取 值不同),当需要的容量等于门限阈值,使用阈值作为新的缓存区容量 目标容量,如果大于阈值,采用每次步进4MB的方式进行 内存扩张((需要扩容值/4MB)*4MB),扩张后需要和最大内存(maxCapacity)进行比较,大于maxCapacity的话就用 maxCapacity,否则使用扩容值目标容量,如果小于阈值,采用倍增的方式,以64(字节)作为基本数值,每次翻倍增长64 — >128 –> 256,直到倍增后的结果大于或等于需要的容量值。
Netty零拷贝
Netty的接收和发送ByteBuf采用DIRECT BUFFERS
,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。 如果使用传统的JVM堆内存(HEAP BUFFERS
)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才能写入Socket 中。JVM堆内存的数据是不能直接写入Socket中的。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
Netty编解码
Netty涉及到编解码的组件有Channel、ChannelHandler、ChannelPipe
等,先大概了解下这几个组件的作用。
ChannelHandler
ChannelHandler
充当了处理入站和出站数据的应用程序逻辑容器。例如,实现ChannelInboundHandler
接口(或 ChannelInboundHandlerAdapter
),你就可以接收入站事件和数据,这些数据随后会被你的应用程序的业务逻辑处理。当你要给连接 的客户端发送响应时,也可以从ChannelInboundHandler
冲刷数据。你的业务逻辑通常写在一个或者多个ChannelInboundHandler
中。 ChannelOutboundHandler
原理一样,只不过它是用来处理出站数据的。
ChannelPipeline
ChannelPipeline
提供了ChannelHandler
链的容器。以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的,即客户端发送给服务端的数据会通过pipeline
中的一系列ChannelOutboundHandler
(ChannelOutboundHandler
调用是从tail到head方向逐个调用每个handler的逻辑),并被这些Handler
处理,反之则称为入站的,入站只调用pipeline
里的ChannelInboundHandler
逻辑(ChannelInboundHandler
调用是从head到tail方向逐个调用每个handler的逻辑)。
Netty 的编(解)码器实现了 ChannelHandlerAdapter
,也是一种特殊的 ChannelHandler
,所以依赖于 ChannelPipeline
,可以将多个编(解)码器链接在一起,以实现复杂的转换逻辑。
Netty里面的编解码: 解码器:负责处理“入站 InboundHandler
”数据。 编码器:负责“出站 OutboundHandler
” 数据。
解码器(Decoder)
解码器负责 解码“入站”数据从一种格式到另一种格式,解码器处理入站数据是抽象ChannelInboundHandler
的实现。实践中使用解码器很简单,就是将入站数据转换格式后
传递到ChannelPipeline
中的下一个ChannelInboundHandler
进行处理;这样的处理时很灵活的,我们可以将解码器放在ChannelPipeline
中,重用逻辑。
对于解码器,Netty中主要提供了抽象基类ByteToMessageDecoder
和MessageToMessageDecoder
-
ByteToMessageDecoder
: 用于将字节转为消息,需要检查缓冲区是否有足够的字节 -
ReplayingDecoder
: 继承ByteToMessageDecoder
,不需要检查缓冲区是否有足够的字节,但是ReplayingDecoder
速度略慢于ByteToMessageDecoder
,同时不是所有的ByteBuf都支持。选择:项目复杂性高则使用ReplayingDecoder
,否则使用ByteToMessageDecoder
-
MessageToMessageDecoder
: 用于从一种消息解码为另外一种消息(例如POJO到POJO)
ByteToMessageDecoder
解码器
用于将接收到的二进制数据(Byte)解码,得到完整的请求报文(Message)。
ByteToMessageDecoder
是一种ChannelInboundHandler
,可以称为解码器,负责将byte字节流住(ByteBuf
)转换成一种Message
,Message
是应用可以自己定义的一种Java对象。
下面列出了ByteToMessageDecoder
两个主要方法:
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)//这个方法是唯一的一个需要自己实现的抽象方法,作用是将ByteBuf数据解码成其他形式的数据。
decodeLast(ChannelHandlerContext, ByteBuf, List<Object>),//实际上调用的是decode(...)。
参数的作用如下:
-
Bytubuf
:需要解码的二进制数据。 -
List<Object>
:解码后的有效报文列表,我们需要将解码后的报文添加到这个List中。之所以使用一个List表示,是因为考虑到粘包问题,因此入参的in中可能包含多个有效报文。
当然,也有可能发生了拆包,in中包含的数据还不足以构成一个有效报文,此时不往List中添加元素即可。
另外特别要注意的是,在解码时,不能直接调用ByteBuf的readXXX方法来读取数据,而是应该首先要判断能否构成一个有效的报文。
public class ToIntegerDecoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() >= 4) {
out.add(in.readInt());
} }
}
只有在可读字节数>=4的情况下,我们才进行解码,即读取一个int,并添加到List中。
在可读字节数小于4的情况下,我们并没有做任何处理,假设剩余可读字节数为3,不足以构成1个int。那么父类ByteToMessageDecoder
发现这次解码List中的元素没有变化,
则会对in中的剩余3个字节进行缓存,等待下1个字节的到来,之后再回到调用ToIntegerDecoder
的decode
方法。
另外需要注意: 在ToIntegerDecoder
的decode
方法中,每次最多只读取一个1个int。如果ByteBuf中的字节数很多,例如为16,那么可以构成4个int,而这里只读取了1个int,那么剩余12字节怎么办?这个其实不用担心,ByteToMessageDecoder
再每次回调子类的decode
方法之后,都会判断输入的ByteBuf
中是否还有剩余字节可读,如果还有,会再次回调子类的decode
方法,直到某个回调decode
方法List中的元素个数没有变化时才停止,元素个数没有变化,实际上意味着子类已经没有办法从剩余的字节中读取一个有效报文。由于存在剩余可读字节时,ByteToMessageDecoder
会自动再次回调子类decode
方法,因此建议在实现ByteToMessageDecoder
时,decode
方法每次只解析一个有效报文即可,没有必要一次全部解析出来。
ByteToMessageDecoder
提供的一些常见的实现类:
-
FixedLengthFrameDecoder
:定长协议解码器,我们可以指定固定的字节数算一个完整的报文 -
LineBasedFrameDecoder
: 行分隔符解码器,遇到\n或者\r\n,则认为是一个完整的报文 -
DelimiterBasedFrameDecoder
: 分隔符解码器,与LineBasedFrameDecoder
类似,只不过分隔符可以自己指定 -
LengthFieldBasedFrameDecoder
:长度编码解码器,将报文划分为报文头/报文体,根据报文头中的Length字段确定报文体的长度,因此报文提的长度是可变的 -
JsonObjectDecoder
:json格式解码器,当检测到匹配数量的”{” 、”}”或”[””]”时,则认为是一个完整的json对象或者json数组。
这些实现类,都只是将接收到的二进制数据,解码成包含完整报文信息的ByteBuf
实例后,就直接交给了之后的ChannelInboundHandler
处理。
ReplayingDecoder
解码器
ReplayingDecoder
是byte-to-message
解码的一种特殊的抽象基类,byte-to-message
解码读取缓冲区的数据之前需要检查缓冲区是否有足够的字节,使用ReplayingDecoder
就无需自己检查;若ByteBuf
中有足够的字节,则会正常读取;若没有足够的字节则会停止解码。
- 不是所有的操作都被
ByteBuf
支持,如果调用一个不支持的操作会抛出DecoderException
。 -
ByteBuf.readableBytes()
大部分时间不会返回期望值
如果你能忍受上面列出的限制,相比ByteToMessageDecoder
,你可能更喜欢ReplayingDecoder
。在满足需求的情况下推荐使用ByteToMessageDecoder
,因为它的处理比较
简单,没有ReplayingDecoder
实现的那么复杂。ReplayingDecoder
继承于ByteToMessageDecoder
,所以他们提供的接口是相同的。下面代码是ReplayingDecoder
的实现:
/**
* Integer解码器,ReplayingDecoder实现
*/
public class ToIntegerReplayingDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
out.add(in.readInt());
}
}
MessageToMessageDecoder
ByteToMessageDecoder
是将二进制流进行解码后,得到有效报文。而MessageToMessageDecoder
则是将一个本身就包含完整报文信息的对象转换成另一个Java对象。
举例: 前面介绍了ByteToMessageDecoder
的部分子类解码后,会直接将包含了报文完整信息的ByteBuf
实例交由之后的ChannelInboundHandler
处理,此时,你可以在ChannelPipeline
中,再添加一个MessageToMessageDecoder
,将ByteBuf
中的信息解析后封装到Java对象中,简化之后的ChannelInboundHandler
的操作。
另外: 一些场景下,有可能你的报文信息已经封装到了Java对象中,但是还要继续转成另外的Java对象,因此一个MessageToMessageDecoder
后面可能还跟着另一个MessageToMessageDecoder
。一个比较容易的理解的类比案例是Java Web编程,通常客户端浏览器发送过来的二进制数据,已经被web容器(如tomcat)解析成了一个HttpServletRequest
对象,但是我们还是需要将HttpServletRequest
中的数据提取出来,封装成我们自己的POJO类,也就是从一个Java对象(HttpServletRequest
)转换成另一个Java对象(我们的POJO类)。
MessageToMessageDecoder
的类声明如下:
/**
* 其中泛型参数I表示我们要解码的消息类型。例前面,我们在ToIntegerDecoder中,把二进制字节流转换成了一个int类型的整数。
*/
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
类似的,MessageToMessageDecoder
也有一个decode
方法需要覆盖 ,如下:
/**
* 参数msg,需要进行解码的参数。例如ByteToMessageDecoder解码后的得到的包含完整报文信息ByteBuf
* List<Object> out参数:将msg经过解析后得到的java对象,添加到放到List<Object> out中
*/
protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
例如,现在我们想编写一个IntegerToStringDecoder
,把前面编写的ToIntegerDecoder
输出的int参数转换成字符串,此时泛型I就应该是Integer类型。
public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {
@Override
public void decode(ChannelHandlerContext ctx, Integer msg List<Object> out) throws Exception {
out.add(String.valueOf(msg));
}
}
此时我们应该按照如下顺序组织ChannelPipieline
中ToIntegerDecoder
和IntegerToStringDecoder
的关系:
ChannelPipieline ch=....
ch.addLast(new ToIntegerDecoder());
ch.addLast(new IntegerToStringDecoder());
也就是说,前一个ChannelInboudHandler
输出的参数类型,就是后一个ChannelInboudHandler
的输入类型。
特别注意,如果我们指定MessageToMessageDecoder
的泛型参数为ByteBuf
,表示其可以直接针对ByteBuf
进行解码,那么其是否能替代ByteToMessageDecoder
呢?
答案是不可以的。因为ByteToMessageDecoder
除了进行解码,还要会对不足以构成一个完整数据的报文拆包数据(拆包)进行缓存。而MessageToMessageDecoder
则没有这样的逻辑。
因此通常的使用建议是,使用一个ByteToMessageDecoder
进行粘包、拆包处理,得到完整的有效报文的ByteBuf实例,然后交由之后的一个或者多个MessageToMessageDecoder
对ByteBuf实例中的数据进行解析,转换成POJO类。
编码器(Encoder)
与ByteToMessageDecoder
和MessageToMessageDecoder
相对应,Netty提供了对应的编码器实现MessageToByteEncoder
和MessageToMessageEncoder
,
二者都实现ChannelOutboundHandler
接口。
相对来说,编码器比解码器的实现要更加简单,原因在于解码器除了要按照协议解析数据,还要要处理粘包、拆包问题;而编码器只要将数据转换成协议规定的二进制格式发送即可。
抽象类MessageToByteEncoder
MessageToByteEncoder
也是一个泛型类,泛型参数I表示将需要编码的对象的类型,编码的结果是将信息转换成二进制流放入ByteBuf
中。子类通过覆写其抽象方法encode来实现编码,如下所示:
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
....
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
}
可以看到,MessageToByteEncoder
的输出对象out是一个ByteBuf
实例,我们应该将泛型参数msg包含的信息写入到这个out对象中。
MessageToByteEncoder
使用案例:
public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> {
@Override
protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
out.writeInt(msg);//将Integer转成二进制字节流写入ByteBuf中
}
}
抽象类MessageToMessageEncoder
MessageToMessageEncoder
同样是一个泛型类,泛型参数I表示将需要编码的对象的类型,编码的结果是将信息放到一个List中。子类通过覆写其抽象方法encode,来实现编码,如下所示:
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {
...
protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
...
}
与MessageToByteEncoder
不同的,MessageToMessageEncoder
编码后的结果放到的out参数类型是一个List中。例如,你一次发送2个报文,因此msg参数中实际上包含了2个报文,因此应该解码出两个报文对象放到List中。
MessageToMessageEncoder
提供的常见子类包括:
-
LineEncoder
:按行编码,给定一个CharSequence(如String)
,在其之后添加换行符\n或者\r\n,并封装到ByteBuf进行输出,与LineBasedFrameDecoder
相对应。 -
Base64Encoder
:给定一个ByteBuf
,得到对其包含的二进制数据进行Base64编码后的新的ByteBuf进行输出,与Base64Decoder
相对应。 -
LengthFieldPrepender
:给定一个ByteBuf
,为其添加报文头Length字段,得到一个新的ByteBuf进行输出。Length
字段表示报文长度,与LengthFieldBasedFrameDecoder
相对应。 -
StringEncoder
:给定一个CharSequence
(如:StringBuilder、StringBuffer、String
等),将其转换成ByteBuf进行输出,与StringDecoder
对应。
这些MessageToMessageEncoder
实现类最终输出的都是ByteBuf,因为最终在网络上传输的都要是二进制数据。
编码解码器Codec
编码解码器: 同时具有编码与解码功能,特点同时实现了ChannelInboundHandler
和ChannelOutboundHandler
接口,因此在数据输入和输出时都能进行处理。
Netty提供提供了一个ChannelDuplexHandler
适配器类,编码解码器的抽象基类 ByteToMessageCodec 、MessageToMessageCodec
都继承与此类,如下:
ByteToMessageCodec
内部维护了一个ByteToMessageDecoder
和一个MessageToByteEncoder
实例,可以认为是二者的功集合,泛型参数I是接受的编码类型:
public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler {
private final TypeParameterMatcher outboundMsgMatcher;
private final MessageToByteEncoder<I> encoder;
private final ByteToMessageDecoder decoder = new ByteToMessageDecoder(){…}
...
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
...
}
MessageToMessageCodec
内部维护了一个MessageToMessageDecoder
和一个MessageToMessageEncoder
实例,可以认为是二者的功集合,泛型参数
INBOUND_IN
和OUTBOUND_IN
分别表示需要解码和编码的数据类型。
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler {
private final MessageToMessageEncoder<Object> encoder= ...
private final MessageToMessageDecoder<Object> decoder =…
...
protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out) throws Exception;
protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out) throws Exception;
}
其他编解码方式,建立联合编解码器
使用编解码器来充当编码器和解码器的组合失去了单独使用编码器或解码器的灵活性,编解码器是要么都有要么都没有。其实还可以让编码器和解码器在ChannelPipeline
中作为一个逻辑单元。Netty提供了一种解决方案,使用CombinedChannelDuplexHandler
。虽然这个类不是编解码器API的一部分,但是它经常被用来建立一个编解码器。
/**
* 解码器,将byte转成char
*/
public class ByteToCharDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
while(in.readableBytes() >= 2){
out.add(Character.valueOf(in.readChar()));
}
}
/**
* 编码器,将char转成byte
*/
public class CharToByteEncoder extends MessageToByteEncoder<Character> {
@Override
protected void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception {
out.writeChar(msg);
}
}
/**
* 继承CombinedChannelDuplexHandler,用于绑定解码器和编码器
*/
public class CharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
public CharCodec(){
super(new ByteToCharDecoder(), new CharToByteEncoder());
}
}
使用CombinedChannelDuplexHandler
绑定解码器和编码器很容易实现,比使用*Codec
更灵活
ChannelInboundHandlerAdapter
处理器
除了使用内置的处理器外,往往还需要根据自己的设计来实现一些处理器,相比上面的类,ChannelInboundHandlerAdapter
往往使用的更多一点。
ChannelInboundHandler
实现的抽象基类,该实现提供了其所有方法的实现。 这个实现只是将操作转发给ChannelPipeline
中的下一个ChannelHandler
。子类可以重写方法实现来改变这一点。 注意,在channelRead(ChannelHandlerContext, Object)
方法自动返回后,消息不会被释放。如果您正在寻找自动释放接收到的消息的ChannelInboundHandler
实现,请参阅SimpleChannelInboundHandler
。
我们通常会继承此类,覆写方法,加入自己的逻辑处理。
-
handlerAdded
: 新建立的连接会按照初始化策略,把handler
添加到该channel
的pipeline
里面,也就是channel.pipeline.addLast(new LifeCycleInBoundHandler)
执行完成后的回调 -
channelRegistered
: 当该连接分配到具体的worker线程后,该回调会被调用。 -
channelActive
:channel的准备工作已经完成,所有的pipeline添加完成,并分配到具体的线上上,说明该channel准备就绪,可以使用了。 -
channelRead
:客户端向服务端发来数据,每次都会回调此方法,表示有数据可读; -
channelReadComplete
:服务端每次读完一次完整的数据之后,回调该方法,表示数据读取完毕; -
channelInactive
:当连接断开时,该回调会被调用,说明这时候底层的TCP连接已经被断开了。 -
channelUnRegistered
: 对应channelRegistered
,当连接关闭后,释放绑定的workder线程; -
handlerRemoved
: 对应handlerAdded
,将handler从该channel的pipeline移除后的回调方法。
/**
* handler的生命周期回调接口调用顺序:
* handlerAdded -> channelRegistered -> channelActive -> channelRead -> channelReadComplete
* -> channelInactive -> channelUnRegistered -> handlerRemoved
*
* handlerAdded: 新建立的连接会按照初始化策略,把handler添加到该channel的pipeline里面,也就是channel.pipeline.addLast(new LifeCycleInBoundHandler)执行完成后的回调;
* channelRegistered: 当该连接分配到具体的worker线程后,该回调会被调用。
* channelActive:channel的准备工作已经完成,所有的pipeline添加完成,并分配到具体的线上上,说明该channel准备就绪,可以使用了。
* channelRead:客户端向服务端发来数据,每次都会回调此方法,表示有数据可读;
* channelReadComplete:服务端每次读完一次完整的数据之后,回调该方法,表示数据读取完毕;
* channelInactive:当连接断开时,该回调会被调用,说明这时候底层的TCP连接已经被断开了。
* channelUnRegistered: 对应channelRegistered,当连接关闭后,释放绑定的workder线程;
* handlerRemoved: 对应handlerAdded,将handler从该channel的pipeline移除后的回调方法。
*/
public class LifeCycleInBoundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx)
throws Exception {
System.out.println("channelRegistered: channel注册到NioEventLoop");
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx)
throws Exception {
System.out.println("channelUnregistered: channel取消和NioEventLoop的绑定");
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
System.out.println("channelActive: channel准备就绪");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx)
throws Exception {
System.out.println("channelInactive: channel被关闭");
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("channelRead: channel中有可读的数据" );
super.channelRead(ctx, msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
System.out.println("channelReadComplete: channel读数据完成");
super.channelReadComplete(ctx);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx)
throws Exception {
System.out.println("handlerAdded: handler被添加到channel的pipeline");
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx)
throws Exception {
System.out.println("handlerRemoved: handler从channel的pipeline中移除");
super.handlerRemoved(ctx);
}
}
需要注意的是,覆写方法后,在结尾一定要加上 ctx.fireChannelRead(msg);
,否则消息不会继续传递,这是使用时易犯的错误。
SimpleChannelInboundHandler
处理器
SimpleChannelInboundHandler
则是最常用的被继承类,这个类有个好处是支持泛型,不像ChannelInboundHandler
,消息用Object传递,需要做类型强制转换。
我们通常只需要覆写channelRead0
抽象方法,SimpleChannelInboundHandler
实际是继承自上面ChannelInboundHandlerAdapter
了,覆写了channelRead
方法,并加入了以下特殊处理 ReferenceCountUtil.release(msg)
,即自动释放消息。
如果使用这个类,并且后续的处理器中仍需要读取消息,则必须手工调用ReferenceCountUtil.retain(msg)
,也就是让消息的引用计数加1,否则框架对引用计数为0的消息会执行释放和回收。
-
ChannelInboundHandlerAdapter
通常用于处于链条中间的某些环节处理,对数据进行某些处理,如数据验证,需要将消息继续传递。 -
SimpleChannelInboundHandler
则比较适合链条最后一个环节,该环节处理完后,后续不再需要该消息,因此可以自动释放。
protostuff
序列化工具
如果要实现高效的编解码可以用protobuf
,但是protobuf
需要维护大量的proto文件比较麻烦,现在一般可以使用protostuff
。 protostuff
是一个基于protobuf
实现的序列化方法,它较于protobuf
最明显的好处是,在几乎不损耗性能的情况下做到了不用我们写.proto文件来实现序列化。使用它也非常简单,代码如下:
<!-- protostuff依赖包 begin -->
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-api</artifactId>
<version>1.0.10</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.0.10</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.0.10</version>
</dependency>
<!-- protostuff依赖包 end -->
/**
* protostuff 序列化工具类,基于protobuf封装
*/
public class ProtostuffUtil {
private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();
private static <T> Schema<T> getSchema(Class<T> clazz) {
@SuppressWarnings("unchecked")
Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);
if (schema == null) {
schema = RuntimeSchema.getSchema(clazz);
if (schema != null) {
cachedSchema.put(clazz, schema);
}
}
return schema;
}
/**
* 序列化
*
* @param obj
* @return
*/
public static <T> byte[] serializer(T obj) {
@SuppressWarnings("unchecked")
Class<T> clazz = (Class<T>) obj.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema<T> schema = getSchema(clazz);
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}
/**
* 反序列化
*
* @param data
* @param clazz
* @return
*/
public static <T> T deserializer(byte[] data, Class<T> clazz) {
try {
T obj = clazz.newInstance();
Schema<T> schema = getSchema(clazz);
ProtostuffIOUtil.mergeFrom(data, obj, schema);
return obj;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
public static void main(String[] args) {
byte[] userBytes = ProtostuffUtil.serializer(new User(1, "zhuge"));
User user = ProtostuffUtil.deserializer(userBytes, User.class);
System.out.println(user);
}
}
Netty粘包拆包
TCP是一个流协议,就是没有界限的一长串二进制数据。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。面向流的通信是无消息保护边界的。 如下图所示,client发了两个数据包D1和D2,但是server端可能会收到如下几种情况的数据。
解决方案
- 消息定长度,传输的数据大小固定长度,例如每段的长度固定为100字节,如果不够空位补空格
- 在数据包尾部添加特殊分隔符,比如下划线,中划线等,这种方法简单易行,但选择分隔符的时候一定要注意每条数据的内部一定不 能出现分隔符。
- 发送长度:发送每条数据的时候,将数据的长度一并发送,比如可以选择每条数据的前4位是数据的长度,应用层处理时可以根据长度来判断每条数据的开始和结束。
Netty提供了多个解码器,可以进行分包的操作,如下:
-
LineBasedFrameDecoder
(回车换行分包) -
DelimiterBasedFrameDecoder
(特殊分隔符分包) -
FixedLengthFrameDecoder
(固定长度报文来分包)
代码示例(自定义解析)
自定义协议
/**
* 自定义协议包
*/
public class MyMessageProtocol {
//定义一次发送包体长度
private int len;
//一次发送包体内容
private byte[] content;
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
客户端
public class MyClient {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyMessageEncoder());
pipeline.addLast(new MyClientHandler());
}
});
System.out.println("netty client start。。");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
public class MyMessageEncoder extends MessageToByteEncoder<MyMessageProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MyMessageProtocol msg, ByteBuf out) throws Exception {
System.out.println("MyMessageEncoder encode 方法被调用");
out.writeInt(msg.getLen());
out.writeBytes(msg.getContent());
}
}
public class MyClientHandler extends SimpleChannelInboundHandler<MyMessageProtocol> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i = 0; i< 2; i++) {
String msg = "你好,我是张三!";
//创建协议包对象
MyMessageProtocol messageProtocol = new MyMessageProtocol();
messageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length);
messageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(messageProtocol);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
服务端
public class MyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyMessageDecoder());
pipeline.addLast(new MyServerHandler());
}
});
System.out.println("netty server start。。");
ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class MyMessageDecoder extends ByteToMessageDecoder {
int length = 0;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println();
System.out.println("MyMessageDecoder decode 被调用");
//需要将得到二进制字节码-> MyMessageProtocol 数据包(对象)
System.out.println(in);
if(in.readableBytes() >= 4) {
if (length == 0){
length = in.readInt();
}
if (in.readableBytes() < length) {
System.out.println("当前可读数据不够,继续等待。。");
return;
}
byte[] content = new byte[length];
if (in.readableBytes() >= length){
in.readBytes(content);
//封装成MyMessageProtocol对象,传递到下一个handler业务处理
MyMessageProtocol messageProtocol = new MyMessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
out.add(messageProtocol);
}
length = 0;
}
}
}
public class MyServerHandler extends SimpleChannelInboundHandler<MyMessageProtocol> {
private int count;
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception {
System.out.println("====服务端接收到消息如下====");
System.out.println("长度=" + msg.getLen());
System.out.println("内容=" + new String(msg.getContent(), CharsetUtil.UTF_8));
System.out.println("服务端接收到消息包数量=" + (++this.count));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
运行结果
netty server start。。
MyMessageDecoder decode 被调用
PooledUnsafeDirectByteBuf(ridx: 0, widx: 56, cap: 1024)
====服务端接收到消息如下====
长度=24
内容=你好,我是张三!
服务端接收到消息包数量=1
MyMessageDecoder decode 被调用
PooledUnsafeDirectByteBuf(ridx: 28, widx: 56, cap: 1024)
====服务端接收到消息如下====
长度=24
内容=你好,我是张三!
服务端接收到消息包数量=2
Netty心跳检测
所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性. 在 Netty 中, 实现心跳机制的关键是 IdleStateHandler
, 看下它的构造器:
-
readerIdleTimeSeconds
: 读超时. 即当在指定的时间间隔内没有从Channel
读取到数据时, 会触发一个READER_IDLE
的IdleStateEvent
事件. -
writerIdleTimeSeconds
: 写超时. 即当在指定的时间间隔内没有数据写入到Channel
时, 会触发一个WRITER_IDLE
的IdleStateEvent
事件. -
allIdleTimeSeconds
: 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个ALL_IDLE
的IdleStateEvent
事件.
这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法。
要实现Netty服务端心跳检测机制需要在服务器端的ChannelInitializer
中加入如下的代码:
pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
IdleStateHandler
里最核心的是下面的代码,不停的去执行这个几个Task
心跳代码示例
客服端
public class HeartBeatClient {
public static void main(String[] args) throws Exception {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new HeartBeatClientHandler());
}
});
System.out.println("netty client start。。");
Channel channel = bootstrap.connect("127.0.0.1", 9000).sync().channel();
String text = "Heartbeat Packet";
Random random = new Random();
while (channel.isActive()) {
int num = random.nextInt(8);
Thread.sleep(num * 1000);
channel.writeAndFlush(text);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(" client received :" + msg);
if (msg != null && msg.equals("idle close")) {
System.out.println(" 服务端关闭连接,客户端也关闭");
ctx.channel().closeFuture();
}
}
}
}
服务端
public class HeartBeatServer {
public static void main(String[] args) throws Exception {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
//IdleStateHandler的readerIdleTime参数指定超过3秒还没收到客户端的连接,
//会触发IdleStateEvent事件并且交给下一个handler处理,下一个handler必须
//实现userEventTriggered方法处理对应事件
pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new HeartBeatServerHandler());
}
});
System.out.println("netty server start。。");
ChannelFuture future = bootstrap.bind(9000).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
}
public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {
int readIdleTimes = 0;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println(" ====== > [server] message received : " + s);
if ("Heartbeat Packet".equals(s)) {
ctx.channel().writeAndFlush("ok");
} else {
System.out.println(" 其他信息处理 ... ");
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()) {
case READER_IDLE:
eventType = "读空闲";
readIdleTimes++; // 读空闲的计数加1
break;
case WRITER_IDLE:
eventType = "写空闲";
// 不处理
break;
case ALL_IDLE:
eventType = "读写空闲";
// 不处理
break;
}
System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);
if (readIdleTimes > 3) {
System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");
ctx.channel().writeAndFlush("idle close");
ctx.channel().close();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
}
}
运行结果
netty client start。。
client received :ok
client received :ok
client received :idle close
服务端关闭连接,客户端也关闭
netty server start。。
=== /127.0.0.1:50521 is active ===
====== > [server] message received : Heartbeat Packet
/127.0.0.1:50521超时事件:读空闲
/127.0.0.1:50521超时事件:读空闲
====== > [server] message received : Heartbeat Packet
/127.0.0.1:50521超时事件:读空闲
/127.0.0.1:50521超时事件:读空闲
[server]读空闲超过3次,关闭连接,释放更多资源
除了最核心的IdleStateHandler
,Netty还提供了基于该类的ReadTimeoutHandler
,WriteTimeoutHandler
名称 | 作用 |
---|---|
IdleStateHandler |
当连接空闲时间(读或写)太长时,将触发 IdleStateEvent 事件,可以通过 ChannelInboundHandler 中重写 userEventTrigged 方法来处理该事件。 |
ReadTimeoutHandler |
如果在指定的时间之内没有发生读事件,就会抛出这个异常,并且自动关闭连接。可以在 exectionCaught 方法中处理这个异常。 |
WriteTimeoutHandler |
如果在指定的时间之内没有发生写事件,抛出异常,并且关闭连接。可以在 exectionCaught 方法中处理这个异常。 |
ChannelPipeline pipeline = ch.pipeline();
//5秒钟没有读事件,则断开连接
pipeline.addLast(new ReadTimeoutHandler(5, TimeUnit.SECONDS));
//5秒钟没有写事件,则断开连接
pipeline.addLast(new WriteTimeoutHandler(5, TimeUnit.SECONDS));
//解码器
pipeline.addLast(new StringDecoder());
//编码器
pipeline.addLast(new StringEncoder());
//业务Handler
pipeline.addLast(new HeartBeanHandler());
public class HeartBeanHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channelRead>>>"+msg+">>>"+ LocalDateTime.now());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught>>>"+cause.getMessage());
}
}
Netty断线重连
- 客户端启动连接服务端时,如果网络或服务端有问题,客户端连接失败,可以重连,重连的逻辑加在客户端。
- 系统运行过程中网络故障或服务端故障,导致客户端与服务端断开连接了也需要重连,可以在客户端处理数据的
Handler
的channelInactive
方法中进行重连。
代码示例
客户端
/**
* 实现了重连的客户端
*/
public class NettyClient {
private String host;
private int port;
private Bootstrap bootstrap;
private EventLoopGroup group;
public static void main(String[] args) throws Exception {
NettyClient nettyClient = new NettyClient("localhost", 9000);
nettyClient.connect();
}
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
init();
}
private void init() {
//客户端需要一个事件循环组
group = new NioEventLoopGroup();
//创建客户端启动对象
// bootstrap 可重用, 只需在NettyClient实例化的时候初始化即可.
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入处理器
ch.pipeline().addLast(new NettyClientHandler(NettyClient.this));
}
});
}
public void connect() throws Exception {
System.out.println("netty client start。。");
//启动客户端去连接服务器端
ChannelFuture cf = bootstrap.connect(host, port);
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
//重连交给后端线程执行
future.channel().eventLoop().schedule(() -> {
System.err.println("重连服务端...");
try {
connect();
} catch (Exception e) {
e.printStackTrace();
}
}, 3000, TimeUnit.MILLISECONDS);
} else {
System.out.println("服务端连接成功...");
}
}
});
//对通道关闭进行监听
cf.channel().closeFuture().sync();
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private NettyClient nettyClient;
public NettyClientHandler(NettyClient nettyClient) {
this.nettyClient = nettyClient;
}
/**
* 当客户端连接服务器完成就会触发该方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
//当通道有读取事件时会触发,即服务端发送数据给客户端
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务端的地址: " + ctx.channel().remoteAddress());
}
// channel 处于不活动状态时调用
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.err.println("运行中断开重连。。。");
nettyClient.connect();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
服务端
public class NettyServer {
public static void main(String[] args) throws Exception {
// 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
// 创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用链式编程来配置参数
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
// 使用NioServerSocketChannel作为服务器的通道实现
.channel(NioServerSocketChannel.class)
// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//对workerGroup的SocketChannel设置处理器
ch.pipeline().addLast(new LifeCycleInBoundHandler());
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start。。");
// 绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
// 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
ChannelFuture cf = bootstrap.bind(9000).sync();
// 给cf注册监听器,监听我们关心的事件
/*cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口9000成功");
} else {
System.out.println("监听端口9000失败");
}
}
});*/
// 等待服务端监听端口关闭,closeFuture是异步操作
// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* handler的生命周期回调接口调用顺序:
* handlerAdded -> channelRegistered -> channelActive -> channelRead -> channelReadComplete
* -> channelInactive -> channelUnRegistered -> handlerRemoved
*
* handlerAdded: 新建立的连接会按照初始化策略,把handler添加到该channel的pipeline里面,也就是channel.pipeline.addLast(new LifeCycleInBoundHandler)执行完成后的回调;
* channelRegistered: 当该连接分配到具体的worker线程后,该回调会被调用。
* channelActive:channel的准备工作已经完成,所有的pipeline添加完成,并分配到具体的线上上,说明该channel准备就绪,可以使用了。
* channelRead:客户端向服务端发来数据,每次都会回调此方法,表示有数据可读;
* channelReadComplete:服务端每次读完一次完整的数据之后,回调该方法,表示数据读取完毕;
* channelInactive:当连接断开时,该回调会被调用,说明这时候底层的TCP连接已经被断开了。
* channelUnRegistered: 对应channelRegistered,当连接关闭后,释放绑定的workder线程;
* handlerRemoved: 对应handlerAdded,将handler从该channel的pipeline移除后的回调方法。
*/
public class LifeCycleInBoundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx)
throws Exception {
System.out.println("channelRegistered: channel注册到NioEventLoop");
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx)
throws Exception {
System.out.println("channelUnregistered: channel取消和NioEventLoop的绑定");
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
System.out.println("channelActive: channel准备就绪");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx)
throws Exception {
System.out.println("channelInactive: channel被关闭");
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("channelRead: channel中有可读的数据" );
super.channelRead(ctx, msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
System.out.println("channelReadComplete: channel读数据完成");
super.channelReadComplete(ctx);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx)
throws Exception {
System.out.println("handlerAdded: handler被添加到channel的pipeline");
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx)
throws Exception {
System.out.println("handlerRemoved: handler从channel的pipeline中移除");
super.handlerRemoved(ctx);
}
}
/**
* 自定义Handler需要继承netty规定好的某个HandlerAdapter(规范)
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取客户端发送的数据
*
* @param ctx 上下文对象, 含有通道channel,管道pipeline
* @param msg 就是客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程 " + Thread.currentThread().getName());
//Channel channel = ctx.channel();
//ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
//将 msg 转成一个 ByteBuf,类似NIO 的 ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
}
/**
* 数据读取完毕处理方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
/**
* 处理异常, 一般是需要关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
运行结果
netty client start。。
服务端连接成功...
收到服务端的消息:HelloClient
服务端的地址: localhost/127.0.0.1:9000
运行中断开重连。。。
netty client start。。
netty client start。。
重连服务端...
netty client start。。
重连服务端...
重连服务端...
netty client start。。
netty client start。。
服务端连接成功...
重连服务端...
收到服务端的消息:HelloClient
服务端的地址: localhost/127.0.0.1:9000
netty server start。。
handlerAdded: handler被添加到channel的pipeline
channelRegistered: channel注册到NioEventLoop
channelActive: channel准备就绪
channelRead: channel中有可读的数据
服务器读取线程 nioEventLoopGroup-3-1
客户端发送消息是:HelloServer
channelReadComplete: channel读数据完成
Netty高并发高性能架构精髓
Reactor 模型
在研究netty的时候我们必定绕不过NIO的,也必定必须研究一下这个Reactor模型的,如果不进行这个Reactor模型和NIO知识点的研究,那么必定掌握不了Netty的精髓,Netty的模型是三种经典的Reactor模型演化过来的
Reactor
模型的核心流程为:注册事件 -> 扫描事件是否发生 -> 事件发生后做出相应的处理。
Reactor反应器模式由Reactor
反应器线程、Handlers
处理器两大角色组成,两大角色的职责分别如下:
-
Reactor
反应器线程的职责:负责响应IO事件,并且分发到Handlers
处理器。 -
Handlers
处理器的职责:非阻塞的执行业务处理逻辑。
Reactor
线程负责多路I/O事件的查询,然后分发到一个或者多个Handler处理器完成I/O 处理,所以,Reactor
模式也叫Dispatcher
模式。总之,Reactor模式和操作系统底层的IO多路复用模型相互结合
单线程Reactor模型
Reactor线程是个多面手,负责多路分离套接字,Accept新连接,并分派请求到Handler处理器中,Reactor和Hander 处于一条线程执行。
当其中某个 handler 阻塞时, 会导致其他所有的 client 的 handler 都得不到执行, 并且更严重的是, handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了)。 因为有这么多的缺陷, 因此单线程Reactor 模型用的比较少。这种单线程模型不能充分利用多核资源,所以实际使用的不多。
什么是单线程Reactor反应器
简单地说,Reactor反应器和Handers处理器 处于一个线程中执行。
基于Java NIO如何实现简单的单线程版本的反应器模式呢?需要用到SelectionKey
选择,键的几个重要的成员方法:
-
void attach(Object o)
将对象附加到选择键 此方法可以将任何的Java POJO对象,作为附件添加到SelectionKey
实例。这方法非常重 要,因为单线程版本的Reactor
反应器模式实现中,可以将Handler
处理器实例,作为附件添 加到SelectionKey
实例。 -
Object attachment()
从选择键获取附加对象 此方法与attach(Object o)
是配套使用的,其作用是取出之前通过attach(Object o)
方法添加到SelectionKey
选择键实例的附加对象。这个方法同样非常重要,当IO事件发生时,选择键将被select方法查询出来,可以直接将选择键的附件对象取出。
在Reactor
模式实现中,通过attachment()
方法所取出的,是之前通过attach(Object o)
方法绑定的Handler实例,然后通过该Handler实例,完成相应的传输处理。
总之,在反应器模式中,需要进行attach
和attachment
结合使用:在选择键注册完成之后, 调用attach
方法,将Handler
实例绑定到选择键;当IO事件发生时,调用attachment
方法,可以 从选择键取出Handler
实例,将事件分发到Handler
处理器中,完成业务处理。
单线程Reactor反应器模式-回显案例
EchoServer的功能很简单:读取客户端的输入,回显到客户端,所以也叫回显服务器。
基于Reactor反应器模式来实现,设计3个重要的类:
- 设计一个反应器类:
EchoServerReactor
类。 - 设计两个处理器类:
AcceptorHandler
新连接处理器、EchoHandler
回显处理器。
//反应器
class EchoServerReactor implements Runnable {
Selector selector;
ServerSocketChannel serverSocket;
EchoServerReactor() throws IOException {
//Reactor初始化
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
//非阻塞
serverSocket.configureBlocking(false);
//分步处理,第一步,接收accept事件
SelectionKey sk =
serverSocket.register(selector,0,new AcceptorHandler());
// SelectionKey.OP_ACCEPT
serverSocket.socket().bind(address);
Logger.info("服务端已经开始监听:"+address);
sk.interestOps(SelectionKey.OP_ACCEPT);
//attach callback object, AcceptorHandler
//sk.attach(new AcceptorHandler());
}
public void run() {
try {
while (!Thread.interrupted()) {
//io事件的查询
// 限时阻塞查询
selector.select(1000);
Set<SelectionKey> selected = selector.selectedKeys();
if (null == selected || selected.size() == 0) {
continue;
}
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
//Reactor负责dispatch收到的事件
SelectionKey sk = it.next();
it.remove(); //避免下次重复处理
dispatch(sk);
}
// selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
void dispatch(SelectionKey sk) {
Runnable handler = (Runnable) sk.attachment();
//调用之前attach绑定到选择键的handler处理器对象
if (handler != null) {
handler.run();
}
}
// Handler:新连接处理器
class AcceptorHandler implements Runnable {
public void run() {
try {
SocketChannel channel = serverSocket.accept();
Logger.info("接收到一个连接");
if (channel != null)
new EchoHandler(selector, channel);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new Thread(new EchoServerReactor()).start();
}
}
第二个处理器为EchoHandler回显处理器,也是一个传输处理器,主要是完成客户端的 内容读取和回显
class EchoHandler implements Runnable {
final SocketChannel channel;
final SelectionKey sk;
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
EchoHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
//设置为非阻塞模式
c.configureBlocking(false);
//仅仅取得选择键,绑定事件处理器
// 后设置感兴趣的IO事件
sk = channel.register(selector, 0);
//将Handler作为选择键的附件
sk.attach(this);
//第二步,注册Read就绪事件
sk.interestOps(SelectionKey.OP_READ);
// 唤醒事件查询线程,在单线程模式下,这里没啥意义
selector.wakeup();
}
public void run() {
try {
System.out.println("------");
if (state == SENDING) { //发送状态
//写入通道
channel.write(byteBuffer);
//写完后,准备开始从通道读,byteBuffer切换成写模式
byteBuffer.clear();
//写完后,注册read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//写完后,进入接收的状态
state = RECIEVING;
} else if (state == RECIEVING) { //开始的时候,为接收状态
//从通道读
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
Logger.info(new String(byteBuffer.array(), 0, length));
}
//读完后,准备开始写入通道,byteBuffer切换成读模式
byteBuffer.flip();
//读完后,注册write就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
//读完后,进入发送的状态
state = SENDING;
}
//处理结束了, 这里不能关闭select key,需要重复使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
sk.cancel();
try {
channel.finishConnect();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
它是一个单线程 版本的反应器模式,Reactor反应器和所有的Handler处理器实例的执行,都执行在同一条线程中。
运行EchoServerReactor
类中的main方法,可以启动回显服务器
客户端的代码,类名为EchoClient
public class EchoClient {
public void start() throws IOException {
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
// 1、获取通道(channel)
SocketChannel socketChannel = SocketChannel.open(address);
Logger.info("客户端连接成功");
// 2、切换成非阻塞模式
socketChannel.configureBlocking(false);
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
//不断的自旋、等待连接完成,或者做一些其他的事情
while (!socketChannel.finishConnect()) {
}
Logger.tcfo("客户端启动成功!");
//启动接受线程
Processor processor = new Processor(socketChannel);
Commander commander = new Commander(processor);
new Thread(commander).start();
new Thread(processor).start();
}
static class Commander implements Runnable {
Processor processor;
Commander(Processor processor) throws IOException {
//Reactor初始化
this.processor = processor;
}
public void run() {
while (!Thread.interrupted()) {
ByteBuffer buffer = processor.getSendBuffer();
Scanner scanner = new Scanner(System.in);
while (processor.hasData.get()) {
Logger.tcfo("还有消息没有发送完,请稍等");
ThreadUtil.sleepMilliSeconds(1000);
}
Logger.tcfo("请输入发送内容:");
if (scanner.hasNext()) {
String next = scanner.next();
buffer.put((Dateutil.getNow() + " >>" + next).getBytes());
processor.hasData.set(true);
}
}
}
}
@Data
static class Processor implements Runnable {
ByteBuffer sendBuffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);
ByteBuffer readBuffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);
protected AtomicBoolean hasData = new AtomicBoolean(false);
final Selector selector;
final SocketChannel channel;
Processor(SocketChannel channel) throws IOException {
//Reactor初始化
selector = Selector.open();
this.channel = channel;
channel.register(selector,
SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
if (sk.isWritable()) {
if (hasData.get()) {
SocketChannel socketChannel = (SocketChannel) sk.channel();
sendBuffer.flip();
// 操作三:发送数据
socketChannel.write(sendBuffer);
sendBuffer.clear();
hasData.set(false);
}
}
if (sk.isReadable()) {
// 若选择键的IO事件是“可读”事件,读取数据
SocketChannel socketChannel = (SocketChannel) sk.channel();
int length = 0;
while ((length = socketChannel.read(readBuffer)) > 0) {
readBuffer.flip();
Logger.info("server echo:" + new String(readBuffer.array(), 0, length));
readBuffer.clear();
}
}
//处理结束了, 这里不能关闭select key,需要重复使用
//selectionKey.cancel();
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new EchoClient().start();
}
}
打印内容
[main|EchoClient.start] |> 客户端连接成功
[main|EchoClient.start] |> 客户端启动成功!
[Thread-0|EchoClient$Commander.run] |> 请输入发送内容:
123123
[Thread-0|EchoClient$Commander.run] |> 还有消息没有发送完,请稍等
[Thread-1|EchoClient$Processor.run] |> server echo:2023-03-01 18:16:27 >>123123
123[Thread-0|EchoClient$Commander.run] |> 请输入发送内容:
123
[Thread-0|EchoClient$Commander.run] |> 还有消息没有发送完,请稍等
[Thread-1|EchoClient$Processor.run] |> server echo:2023-03-01 18:16:28 >>123
[Thread-0|EchoClient$Commander.run] |> 请输入发送内容:
12123123123
[Thread-0|EchoClient$Commander.run] |> 还有消息没有发送完,请稍等
[Thread-1|EchoClient$Processor.run] |> server echo:2023-03-01 18:16:33 >>12123123123
多线程的Reactor
在单线程Reactor模式基础上,做如下改进:
- 将Handler处理器的执行放入线程池,多线程进行业务处理。
- 而对于Reactor而言,可以仍为单个线程。如果服务器为多核的CPU,为充分利用系统资源,可以将Reactor拆分为两个线程。
主从多线程Reactor
mainReactor
只负责处理连接,至于真正的事件处理则交给 subReactor
线程,这样分工的好处就是各不干扰,并且 main 那里。
既然Reactor反应器和Handler处理器,挤在一个线程会造成非常严重的性能缺陷。那么, 可以使用多线程,对基础的反应器模式进行改造和演进。
多线程Reactor反应器的演进,分为两个方面:
- 首先是升级Reactor反应器。可以考虑引入多个Selector选择器,提升查询和分发大 量通道的IO事件的能力。
- 其次是升级Handler处理器。既要使用多线程,又要尽可能的高效率,则可以考虑使用线程池。
总体来说,多线程版本的反应器模式,大致如下:
- 将负责数据传输处理的
IOHandler
处理器的执行,放入独立的线程池中。这样,业务处理线程与负责新连接监听的反应器线程就能相互隔离,避免服务器的连接监听受到阻塞。 - 如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器(
SubReactor
) 线程;同时,引入多个选择器,并且为每一个SubReactor
引入一个线程,一个线程负责一个 选择器的事件轮询。这样,充分释放了系统资源的能力;也大大提升反应器管理大量连接, 或者监听大量传输通道的能力。
多线程版本的Reactor反应器-回显案例
在前面的“回显服务器”(EchoServerReactor)的基础上,完成多线程Reactor 反应器 的升级。多线程反应器的实践案例设计如下:
- 引入多个选择器。
- 设计一个新的子反应器(
SubReactor
)类,一个子反应器负责查询一个选择器的 查询、分发。 -
开启多个处理线程,一个处理线程负责执行一个子反应器(
SubReactor
)的。 为了提升效率,这里由一个线程负责一个SubReactor
的所有操作,避免多个线程负责一个选择器,导致需要进行线程同步,从而引发性能下降的问题。 -
进行IO事件的分类隔离。将新连接事件
OP_ACCEPT
的反应处理,和普通的读 (OP_READ
)事件、写(OP_WRITE
)事件反应处理,进行分开隔离。 这里,专门用一个SubReactor
负责新连接事件查询和分发,防止耗时的IO操作导致新连 接事件OP_ACCEPT
事件查询发生延迟,这个专门的反应器也做bossReactor
;与之相对应的、 负责IO事件的查询、分发的反应器,叫做workReactor
。 - 将IO事件的查询、分发和处理线程隔离。具体来说,就是将Handler处理器的执行, 不放在Reactor绑定的线程上完成。实际上,在高并发、高性能的场景下,需要将耗时的处理与IO反应处理进行隔离,耗时的Handler处理器需要在专门的线程上完成,避免IO反应处理被阻塞。
但是,这里为了不至于将demo实现弄得非常复杂,暂时仅仅把业务Handler
处理的工作, 交给独立的线程池去执行;并没有把AcceptorHandler
的工作,剥离给其他的线程去执行,仍然是在Reactor
绑定的线程上完成。
一个bossReactor
负责新连接事件的反应处理(查询、 分发、处理),bossReactor
和boss
选择器进行绑定;两个workReactor
负责普通IO事件的查询和分发,分别绑定一个worker
选择器。
服务端的监听通道注册到boss选择器,而所有的Socket传输通道通过轮询策略注册到 worke选择器,从而实现了新连接监听和IO读写事件监听的线程分离。
MultiThreadEchoServerReactor
多线程反应器
class MultiThreadEchoServerReactor {
ServerSocketChannel serverSocket;
AtomicInteger next = new AtomicInteger(0);
Selector bossSelector = null;
Reactor bossReactor = null;
//selectors集合,引入多个selector选择器
Selector[] workSelectors = new Selector[2];
//引入多个子反应器
Reactor[] workReactors = null;
MultiThreadEchoServerReactor() throws IOException {
//初始化多个selector选择器
bossSelector = Selector.open();// 用于监听新连接事件
workSelectors[0] = Selector.open(); // 用于监听read、write事件
workSelectors[1] = Selector.open(); // 用于监听read、write事件
serverSocket = ServerSocketChannel.open();
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
serverSocket.socket().bind(address);
serverSocket.configureBlocking(false);//非阻塞
//bossSelector,负责监控新连接事件, 将 serverSocket注册到bossSelector
SelectionKey sk =
serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);
//绑定Handler:新连接监控handler绑定到SelectionKey(选择键)
sk.attach(new AcceptorHandler());
//bossReactor反应器,处理新连接的bossSelector
bossReactor = new Reactor(bossSelector);
//第一个子反应器,一子反应器负责一个worker选择器
Reactor workReactor1 = new Reactor(workSelectors[0]);
//第二个子反应器,一子反应器负责一个worker选择器
Reactor workReactor2 = new Reactor(workSelectors[1]);
workReactors = new Reactor[]{workReactor1, workReactor2};
}
private void startService() {
// 一子反应器对应一条线程
new Thread(bossReactor).start();
new Thread(workReactors[0]).start();
new Thread(workReactors[1]).start();
}
//反应器
class Reactor implements Runnable {
//每条线程负责一个选择器的查询
final Selector selector;
public Reactor(Selector selector) {
this.selector = selector;
}
public void run() {
try {
while (!Thread.interrupted()) {
//单位为毫秒
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (null == selectedKeys || selectedKeys.size() == 0) {
continue;
}
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
//Reactor负责dispatch收到的事件
SelectionKey sk = it.next();
dispatch(sk);
}
selectedKeys.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
void dispatch(SelectionKey sk) {
Runnable handler = (Runnable) sk.attachment();
//调用之前attach绑定到选择键的handler处理器对象
if (handler != null) {
handler.run();
}
}
}
// Handler:新连接处理器
class AcceptorHandler implements Runnable {
public void run() {
try {
SocketChannel channel = serverSocket.accept();
Logger.info("接收到一个新的连接");
if (channel != null) {
int index = next.get();
Logger.info("选择器的编号:" + index);
Selector selector = workSelectors[index];
new MultiThreadEchoHandler(selector, channel);
}
} catch (IOException e) {
e.printStackTrace();
}
if (next.incrementAndGet() == workSelectors.length) {
next.set(0);
}
}
}
public static void main(String[] args) throws IOException {
MultiThreadEchoServerReactor server =
new MultiThreadEchoServerReactor();
server.startService();
}
}
MultiThreadEchoHandler
业务回显处理器
class MultiThreadEchoHandler implements Runnable {
final SocketChannel channel;
final SelectionKey sk;
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
//引入线程池
static ExecutorService pool = Executors.newFixedThreadPool(4);
MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
channel.configureBlocking(false);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
//仅仅取得选择键,后设置感兴趣的IO事件
sk = channel.register(selector, 0);
//将本Handler作为sk选择键的附件,方便事件dispatch
sk.attach(this);
//向sk选择键注册Read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//唤醒 查询线程,使得OP_READ生效
selector.wakeup();
Logger.info("新的连接 注册完成");
}
public void run() {
//异步任务,在独立的线程池中执行
//提交数据传输任务到线程池
//使得IO处理不在IO事件轮询线程中执行,在独立的线程池中执行
pool.execute(new AsyncTask());
}
//异步任务,不在Reactor线程中执行
//数据传输与业务处理任务,不在IO事件轮询线程中执行,在独立的线程池中执行
public synchronized void asyncRun() {
try {
if (state == SENDING) {
//写入通道
channel.write(byteBuffer);
//写完后,准备开始从通道读,byteBuffer切换成写模式
byteBuffer.clear();
//写完后,注册read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//写完后,进入接收的状态
state = RECIEVING;
} else if (state == RECIEVING) {
//从通道读
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
Logger.info(new String(byteBuffer.array(), 0, length));
}
//读完后,准备开始写入通道,byteBuffer切换成读模式
byteBuffer.flip();
//读完后,注册write就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
//读完后,进入发送的状态
state = SENDING;
}
//处理结束了, 这里不能关闭select key,需要重复使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
}
}
//异步任务的内部类
class AsyncTask implements Runnable {
public void run() {
MultiThreadEchoHandler.this.asyncRun();
}
}
}
打印结果
[Thread-0|MultiThreadEchoServerReactor$AcceptorHandler.run] |> 接收到一个新的连接
[Thread-0|MultiThreadEchoServerReactor$AcceptorHandler.run] |> 选择器的编号:0
[Thread-0|MultiThreadEchoHandler.<init>] |> 新的连接 注册完成
[pool-1-thread-1|MultiThreadEchoHandler.asyncRun] |> 哈哈哈哈哈
[pool-1-thread-2|MultiThreadEchoHandler.asyncRun] |> 哈哈哈哈哈
[pool-1-thread-4|MultiThreadEchoHandler.asyncRun] |> 哈哈哈哈哈
[pool-1-thread-2|MultiThreadEchoHandler.asyncRun] |> 哈哈哈哈哈
[pool-1-thread-2|MultiThreadEchoHandler.asyncRun] |> 哈哈哈哈哈
[pool-1-thread-1|MultiThreadEchoHandler.asyncRun] |> 哈哈哈哈哈
[pool-1-thread-3|MultiThreadEchoHandler.asyncRun] |> 哈哈哈哈哈
[pool-1-thread-3|MultiThreadEchoHandler.asyncRun] |> 哈哈哈哈哈
[pool-1-thread-2|MultiThreadEchoHandler.asyncRun] |> 哈哈哈哈哈
[Thread-0|MultiThreadEchoServerReactor$AcceptorHandler.run] |> 接收到一个新的连接
[Thread-0|MultiThreadEchoServerReactor$AcceptorHandler.run] |> 选择器的编号:1
[Thread-0|MultiThreadEchoHandler.<init>] |> 新的连接 注册完成
[pool-1-thread-4|MultiThreadEchoHandler.asyncRun] |> 哈哈哈哈哈
[pool-1-thread-2|MultiThreadEchoHandler.asyncRun] |> 哈哈哈哈哈
[pool-1-thread-3|MultiThreadEchoHandler.asyncRun] |> 哈哈哈哈哈
[pool-1-thread-3|MultiThreadEchoHandler.asyncRun] |> 哈哈哈哈哈
[Thread-0|MultiThreadEchoServerReactor$AcceptorHandler.run] |> 接收到一个新的连接
[Thread-0|MultiThreadEchoServerReactor$AcceptorHandler.run] |> 选择器的编号:0
[Thread-0|MultiThreadEchoHandler.<init>] |> 新的连接 注册完成
[pool-1-thread-4|MultiThreadEchoHandler.asyncRun] |> 哈哈哈哈哈
[pool-1-thread-4|MultiThreadEchoHandler.asyncRun] |> 哈哈哈哈哈
问题解答
Netty
是如何支持主从 Reactor
模型?
当接收连接的时候会建立一个 ServerSocketChannel
并注册到 mainReactor
中,同时 ServerSocketChannel
会创建一个 Channel
注册到 subReactor
中,这样就完成了主从 Recator
的绑定关系。其实就是我们的bossGroup, workerGroup
Netty
中的 main reactor
大多不能用到一个线程组,只能用到线程组里面的一个线程?指的是bossGroup
为何指定为1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
因为端口号只会绑定一次,只负责接受连接并分配给worker线程
Netty
给 Channel
分配 NioEventLoop
是不同的,在Win和MAC,还有Linux上分别对应了JDK
不同的版本。
sun.nio.ch.DefaultSelectorProvider.create()
核心代码(JDK8)
Netty线程模型
Netty的线程模型是基于主从多Reactor模型
详细版
简易版
Netty 中网络的连接事件(OP_ACCEPT)
由Main Reactor 线程组实现,即 Boss Group,通常只需设置一个线程。
网络的读写操作由 Work Group ( Sub Reactor) 线程组来实现,线程的个数默认为 2 * CPU Core,一个 Channel 绑定到其中一个 Work 线程,一个 Work 线程中可以绑定多个 Channel。
在将事件处理器添加到事件链时可以指定在哪个线程池中执行,如果不指定则为IO线程中执行。
// 在这里可以指定执行的线程池
ch.pipeline().addLast(workerGroup,new NettyServerHandler());
通常业务操作会专门开辟一个线程池,那业务处理完成之后,如何将响应结果通过 IO 线程写入到网卡中呢?
业务线程调用 Channel 对象的 write 方法并不会立即写入网络,只是将数据放入一个待写入队列(缓存区),然后IO线程每次执行事件选择后,会从待写入缓存区中获取写入任务,将数据真正写入到网络中,数据到达网卡之前会经过一系列的 Channel Handler
(Netty事件传播机制),最终写入网卡。
Netty 中 IO 线程的大体工作流程
IO线程处理的关键点:
- 每一IO线程在执行上述操作时是串行执行的,即注册在一个 Selector(事件选择器)中的所有通道,同一时间只有一个通道的事件被处理。这也是为什么NIO应对大文件传输时不具备优势的根本原因。
- IO 线程在处理完所有就绪事件后,还会从任务队列(Task Queue)获取任务,例如上面中提到的业务线程在执行完业务后需要将返回结果写入网络,Netty 中所有的网络读写操作只能在IO线程中真正获得运行,故业务线程需要将带写入的响应结果封装成 Task,放入到 IO 线程任务队列中。
Netty线程模型总结
- Netty 的线程模型基于主从多Reactor模型。通常由一个线程负责处理
OP_ACCEPT
事件,拥有 CPU 核数的两倍的IO线程处理读写事件。 - 一个通道的IO操作会绑定在一个IO线程中,而一个IO线程可以注册多个通道。
- 在一个网络通信中通常会包含网络数据读写,编码、解码、业务处理。默认情况下编码、解码等操作会在IO线程中运行,但也可以指定其他线程池。
- 通常业务处理会单独开启业务线程池,但也可以进一步细化,例如心跳包可以直接在IO线程中处理,而需要再转发给业务线程池,避免线程切换。
- 在一个IO线程中所有通道的事件是串行处理的。
Netty常用协议
Netty Http 协议
Netty 来开发 TCP 协议,一般的应用场景都是客户端和服务端长连接通讯的模式,其实,除了 TCP 协议之外 Netty 还支持其他常见的应用协议,比如:Http、WebSocket 等。我们所熟悉的 Tomcat 在 6.x 之后其实底层就是基于 Netty 去实现的。
将实现一个 Demo
- 使用 Netty 开发一个 Web 服务器,端口是 9000;
- 客户端请求,则不再是使用 Netty 编写的客户端代码了,而是通过浏览器输入地址进行访问。
- 服务端响应,我们的 Web 服务器往浏览器输出信息,并且能够在浏览器上打印相关信息。
TestServer
服务模版
public class TestServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
//1.Netty提供的针对Http的编解码
pipeline.addLast(new HttpServerCodec());
//2.自定义处理Http的业务Handler
pipeline.addLast(new TestHttpServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
这个是 Netty 的基本模板类,跟我们之前写的并没有什么不同,只是它给我们提了一个特殊的类 HttpServerCodec
,从字面上都能猜到它就是针对 Http 服务的编解码器。
TestHttpServerHandler
Handler类
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if(msg instanceof HttpRequest) {
//1.打印浏览器的请求地址
System.out.println("客户端地址" + ctx.channel().remoteAddress());
//2.强制转换成HttpRequest
HttpRequest httpRequest = (HttpRequest) msg;
//3.获取uri, 过滤指定的资源
URI uri = new URI(httpRequest.uri());
if("/favicon.ico".equals(uri.getPath())) {
System.out.println("请求了 favicon.ico, 不做响应");
return;
}
//4.给浏览器发送的信息,封装成ByteBuf
ByteBuf content = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
//5.构造一个http的相应,即 httpresponse
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
content);
//6.设置响应头信息-响应格式
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
//7.设置响应头信息-响应数据长度
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
//8.将构建好 response返回
ctx.writeAndFlush(response);
}
}
}
- 浏览器发送过来的数据,被 Netty 给封装成了 HttpObject 对象,我们需要判断 HttpObject 具体所属类型是不是 HttpRequest;
- 请求信息: 可以打印浏览器的请求信息,比如:请求地址、请求方式、请求体内容、请求头内容等;
- 响应信息: 给浏览器响应,必须构造 HttpResponse 对象,并且可以设置响应头信息、响应体信息。
Netty WebSocket 协议
在真实的开发当中,其实 WebSocket 使用非常的广泛,应用涉及通信、聊天、推送等业务,则可以使用 WebSocket 来实现,因此,WebSocket 已经是目前非常主流的浏览器和服务端建立长连接的通信技术。
应用方向一: 纯后端的应用,比如:框架、中间件通信等,则完全可以使用 Netty 来开发客户端和服务端,并且双方通过 TCP 协议来进行通信。
应用方向二: 前端(浏览器)和服务端之间通信,并且想实现类似长连接的效果,那么 WebSocket 和 Netty 则是主流,并且这部分的应用场景非常的广泛和运用非常的多。
也许很多人直接使用 Spring 封装的 WebSocket 或者其它第三方的框架(比如:netty-socketio
)去实现。这里主要讲解原生 Netty 来开发 WebSocket 协议格式的数据请求,毕竟会用和知道知其所以然还是有区别的。
主要功能如下:
- 前端页面(html)启动的时候,连接 WebSocket 服务端;
- 服务端往前端每隔 5s 推送一条消息。
MyWebSocketServer
服务模版
public class MyWebSocketServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//因为基于http协议,使用http的编码和解码器
pipeline.addLast(new HttpServerCodec());
//是以块方式写,添加ChunkedWriteHandler处理器
pipeline.addLast(new ChunkedWriteHandler());
//http数据在传输过程中是分段, HttpObjectAggregator ,就是可以将多个段聚合
pipeline.addLast(new HttpObjectAggregator(8192));
//将 http协议升级为 ws协议 , 保持长连接
pipeline.addLast(new WebSocketServerProtocolHandler("/hello2"));
//自定义的handler ,处理业务逻辑
pipeline.addLast(new MyWebSocketHandler());
}
});
//启动服务器
ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- WebSocket 比 TCP 和 Http 协议都稍微复杂有点,它其实是 TCP 和 Http 协议的结合,首先是连接之前发送的是 Http 协议请求,但是新增 Http 所没有的附加头信息
Upgrade: WebSocket
表明是一个申请协议升级的 Http 请求。其次,真正建立连接之后,其实底层是 TCP 协议长连接; -
HttpServerCodec
将请求和应答消息解码为 HTTP 消息; -
ChunkedWriteHandler
向客户端发送 HTML5 文件; - http 数据在传输过程中是分段,当浏览器发送大量数据时,就会发出多次 http 请求,
HttpObjectAggregator
可以将 HTTP 消息的多个部分合成一条完整的 HTTP 消息; -
WebSocketServerProtocolHandler
定义了 WebSocket 的对外暴露地址。
MyWebSocketHandler
业务类
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//1.获取Channel通道
final Channel channel=ctx.channel();
//2.创建一个定时线程池
ScheduledExecutorService ses= Executors.newScheduledThreadPool(1);
//3.一秒钟之后只需,并且每隔5秒往浏览器发送数据
ses.scheduleWithFixedDelay(new Runnable() {
public void run() {
String sendTime=format.format(new Date());
channel.writeAndFlush(new TextWebSocketFrame("推送时间=" + sendTime));
}
},1,5, TimeUnit.SECONDS);
}
//接受浏览器消息
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("收到消息 " + msg.text());
}
//当web客户端连接后,触发方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
//当web客户端断开后,触发方法
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
}
其实 WebSocket 对于的 Handler 跟我们普通业务的 Handler 没有什么区别,这里主要使用定时线程池定时往浏览器推送消息,这个是传统的 Http+Ajax 请求无法实现的逆向推送效果。
前端测试代码
function WebSocketTest() {
if ("WebSocket" in window) {
var ws = new WebSocket("ws://localhost:7000/hello2");
//发送数据
ws.onopen = function() {
ws.send("发送数据");
};
//接受数据
ws.onmessage = function(evt) {
var received_msg = evt.data;
console.log(">>>"+received_msg)
};
//监听连接关闭
ws.onclose = function() {
alert("连接已关闭...");
};
} else {
alert("您的浏览器不支持 WebSocket!");
}
}
这里我使用工具测试