
Netty导入依赖
<dependency>
<groupId>io.nettygroupId>
<artifactId>netty-allartifactId>
<version>4.1.20.Finalversion>
dependency>
public class NettyServer {
public static void main(String[] args) {
// 负责监听,并将NioEventLoop事件注册到workGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 负责接收NioEventLoop和网络读写
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 辅助类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128) // 线程数
.childOption(ChannelOption.SO_KEEPALIVE,true) // 存活
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
ChannelFuture future = serverBootstrap.bind("localhost", 6668).sync();// 启动服务器
System.out.println("服务器启动了 \r\n"+new Date());
future.channel().closeFuture().sync(); // 监听关闭
} catch (Exception e) {
e.printStackTrace();
}finally {
// 优雅关闭
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 负责事件读取
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 我中有您 ,您中有我
Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline();
// pipeline.channel()
// channel.pipeline()
// 读取消息
ByteBuf buf = (ByteBuf) msg;
System.out.println(channel.remoteAddress() + "客户端:-->" + buf.toString(CharsetUtil.UTF_8));
}
/**
* 读取完之后做的事
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("恭喜您诞生了",CharsetUtil.UTF_8));
}
/**
* 出现异常处理
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
public class NettyClient {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(bossGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture future = bootstrap.connect("localhost", 6668).sync();
System.out.println("客户端+:" + future.channel().remoteAddress() + "上线了");
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
}
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 向服务端发送消息
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello server", CharsetUtil.UTF_8));
}
/**
* 读取服务端发送来的消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务端" + ctx.channel().remoteAddress() + "-->" + buf.toString(CharsetUtil.UTF_8));
}
/**
* 处理异常
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 启动任务队列
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("🐱11111",CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
System.out.println("go on----");
}
提交两个任务队列
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 启动任务队列
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("🐱11111",CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 启动任务队列
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(20*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("🐱2222",CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
System.out.println("go on----");
}
第二条消息,会在30s之后发送,因为两个任务队列同属于一个线程
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 启动任务队列
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
ctx.writeAndFlush(Unpooled.copiedBuffer("🐱11111",CharsetUtil.UTF_8));
}
},5, TimeUnit.SECONDS);
System.out.println("go on----");
}
Future说明
(1)表示异步的执行结果,可以通过它提供的方法来检测执行是否完成,比如检索计算等
(2)ChannelFuture是一个接口:public interface ChannelFuture extends Future

说明:
(1)在使用Netty进行编程时,拦截操作和转换出入站数据只需要您提供callback或者利用future即可。这使得链式操作简单、高效,并有利于编写可重用的,通用的代码
(2)Netty框架的目标就是让您的业务逻辑从网络基础应用编程分离出来,解脱出来
Future-Listener 机制
(1)当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回 ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作
(2)常见有如下操作
- 通过isDone方法来判断当前操作是否完成
- 通过isSuccess方法来判断已完成的当前操作是否成功
- 通过getCause方法来获取已完成的当前操作失败的原因
- 通过isCancelled方法来判断已完成的当前操作是否被取消
- 通过addListener方法来注册监听器,当操作已完成(isDone方法返回完成),见会通知指定的监听器,如果Future对象已完成,则通知指定的监听器
(3)举例说明
绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器逻辑
ChannelFuture future = serverBootstrap.bind("localhost", 6668).sync();// 启动服务器
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println(new Date()+"绑定端口成功");
}else{
System.out.println("绑定端口失败");
}
}
});
System.out.println("服务器启动了 \r\n"+new Date());
future.channel().closeFuture().sync(); // 监听关闭
小结:相比传统阻塞IO,执行IO操作后线程会被阻塞,直到操作完成;异步处理的好处是不会造成线程阻塞,线程在IO操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量
实例:Netty服务器在6668端口监听,浏览器发出请求“http://localhost:6668”,服务器可以回复“Hello!我是服务器”,并对特定请求资源进行过滤
public class TestServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(7777).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 向管道中加入处理器
// 得到管道
ChannelPipeline pipeline = socketChannel.pipeline();
// 加入一个netty提供的httpServerCodec codec => [coder - decoder] http编解码器
// 1. httpServerCodec 是netty提供的处理http的编解码器
pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());
// 2. 增加一个自定义的handler
pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());
}
}
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
// 读取客户端数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
// 判断msg是不是httpRequest请求
if(msg instanceof HttpRequest){
HttpRequest httpRequest = (HttpRequest) msg;
// 获取uri
URI uri = new URI(httpRequest.uri());
if("/favicon.ico".equals(uri.getPath())){
System.out.println("请求了 favicon.ico,不做响应");
return;
}
System.out.println("msg类型="+msg.getClass());
System.out.println("客户端地址="+ctx.channel().remoteAddress());
// 回复信息给浏览器 [http协议]
ByteBuf content = Unpooled.copiedBuffer("hello,我是服务器", CharsetUtil.UTF_8);
// 构造一个http响应,即httpResponse
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
// 将构建好的response返回
ctx.writeAndFlush(response);
}
}
}
在Netty中每个channel都有且只有一个ChannelPipeline与之对应,他们的组成关系如下

public class GroupChatServer {
private int port; // 监听端口
public GroupChatServer(int port){
this.port = port;
}
// 编写run方法,处理客户端请求
public void run() throws InterruptedException {
// 创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 默认线程数=cpu核数*2
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); // 获取到pipeline
pipeline.addLast("decoder", new StringDecoder()); // 向pipeline中加入解码器
pipeline.addLast("encoder", new StringEncoder()); // 向pipeline中加入编码器
pipeline.addLast(new GroupChatServerHandler()); // 向pipeline中加入自定义的业务处理handler
}
});
System.out.println("netty服务端启动");
ChannelFuture channelFuture = bootstrap.bind(port).sync(); // 绑定端口,并启动
channelFuture.channel().closeFuture().sync(); // 监听关闭
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new GroupChatServer(7000).run(); // 启动
}
}
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
// 定义一个channel组,管理所有的channel
// GlobalEventExecutor.INSTANCE 是一个全局的事件执行器,是一个单例
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
// 表示连接建立,一旦连接,第一个被执行
// 将当前channel加入到channelGroup中
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// 将该客户加入聊天的消息推送给其他在线客户端
// 该方法会将channelGroup中所有的channel遍历,并发送消息
channelGroup.writeAndFlush(sdf.format(new Date())+" [客户端]"+channel.remoteAddress()+"加入聊天\n");
channelGroup.add(channel);
}
// 表示channel处于一个活动的状态
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(sdf.format(new Date())+" "+ctx.channel().remoteAddress()+"上线了~");
}
// 表示channel中有读事件
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 获取到当前channel
Channel channel = ctx.channel();
// 遍历一下channelGroup,给自己发送的消息和给其他客户端发送的消息不一样
channelGroup.forEach(ch ->{
if(ch != channel){ // 不是当前channel,直接转发
ch.writeAndFlush(sdf.format(new Date())+" [客户]"+channel.remoteAddress()+"发送消息:"+ msg + "\n");
}else{
ch.writeAndFlush(sdf.format(new Date())+" [自己]发送了消息:"+msg+"\n");
}
});
}
// 表示channel处于一个非活动状态
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(sdf.format(new Date())+" "+ctx.channel().remoteAddress()+"离线了~");
}
// 断开连接,将xx客户离开的信息推送给当前在线的客户
// 触发此方法后,会自动将断开连接的channel从channelGroup中移除
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush(sdf.format(new Date())+" [客户端]"+channel.remoteAddress()+"离开了\n");
System.out.println("channelGroup size = "+channelGroup.size());
}
// 发生异常,关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
public class GroupChatClient {
private final String host;
private final int port;
public GroupChatClient(String host, int port){
this.host = host;
this.port = port;
}
public void run() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new StringDecoder()); // 向pipeline中加入解码器
pipeline.addLast("encoder", new StringEncoder()); // 向pipeline中加入编码器
pipeline.addLast(new GroupChatClientHandler()); // 向pipeline中加入自定义的业务处理handler
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
System.out.println("-----"+channel.localAddress()+"-----");
// 客户端要输入的信息,创建一个扫描器
Scanner scanner = new Scanner(System.in);
while(scanner.hasNextLine()){
String msg = scanner.nextLine();
channel.writeAndFlush(msg+"\r\n");
}
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new GroupChatClient("127.0.0.1", 7000).run();
}
}
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}
私聊实现思路:在 GroupChatServerHandler 中维护一个Map
key:userId value:channel
案例要求:
当服务器超过3s没有读时,就提示度空闲
读服务器超过5s没有写操作时,就提示写空闲
当服务器超过7s没有读或写操作时,就提示读写空闲
public class MyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO)) // 在bossGroup增加一个日志处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// IdleStateHandler:netty提供的处理空闲状态的处理器
// long readerIdleTime, 表示多长时间没有读,就会发送一个心跳检测包检测是否连接
// long writerIdleTime, 表示多长时间没有写,就会发送一个心跳检测包检测是否连接
// long allIdleTime 表示多长时间没有读写,就会发送一个心跳检测包检测是否连接
// 当IdleStateEvent触发后,就会传递给管道下一个handler去处理(通过调用下一个handler的useEventTriggered,在该方法中
// 去处理IdleStateEvent(读空闲,写空闲,读写空闲))
pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
// 加入一个对空闲检测进一步处理的handler(自定义)
pipeline.addLast(new MyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class MyServerHandler extends ChannelInboundHandlerAdapter {
/*
* @param ctx 上下文
* @param evt 事件
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
// 将evt向下转型 IdleStateEvent
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()){
case READER_IDLE:
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress()+"--超时时间--"+eventType);
System.out.println("服务器做相应处理...");
}
}
}
要求:
1. Http协议是无状态的,浏览器和服务器之间的请求响应一次,下次会重新创建连接
2. 要求:实现基于webSocket的长连接的全双工交互
3. 改变Http协议多次请求的约束,实现长连接了,服务器可以发送消息给浏览器
4. 客户端浏览器和服务器端会相互感知,同样浏览器关闭了,服务也会感知
public class MyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO)) // 在bossGroup增加一个日志处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 因为基于http协议,使用http的编解码器
pipeline.addLast(new HttpServerCodec());
// 是以块方式写,添加ChunkedWriteHandler处理器
pipeline.addLast(new ChunkedWriteHandler());
// http数据在传输过程中是分段的,HttpObjectAggregator可以将多个段聚合
// 这就是为什么,当浏览器发送大量数据时,会发出多次http请求
pipeline.addLast(new HttpObjectAggregator(8192));
// 对应websocket,它的数据是以帧的形式传递
// 浏览器请求时 ws://localhost:7000/hello 表示请求的uri
// WebSocketServerProtocolHandler核心功能是将http协议升级为ws协议,保持长连接
// 是通过一个状态码 101 将http协议升级成ws协议的
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
pipeline.addLast(new MyTextWebSocketFrameHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
// TextWebSocketFrame 类型,表示一个文本帧
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("服务器端收到消息:"+msg.text());
// 回复消息
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间"+ LocalDateTime.now()+" "+msg.text()));
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded被调用"+ctx.channel().id().asLongText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved被调用"+ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常发生"+cause.getMessage());
ctx.close();
}
}
DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Titletitle>
head>
<body>
<form onsubmit="return false">
<textarea name="message" style="width:300px;height:300px;">textarea>
<input type="button" value="发送消息" onclick="send(this.form.message.value)">
<textarea id="responseText" style="width:300px;height:300px;">textarea>
<input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
form>
<script>
var socket;
// 判断当前浏览器是否支持websocket
if(window.WebSocket){
socket = new WebSocket("ws://localhost:7000/hello");
socket.onopen = function(ev){
var rt = document.getElementById("responseText");
rt.value = "连接开启了...";
}
socket.onclose = function(ev){
var rt = document.getElementById("responseText");
rt.value = rt.value + '\n' + "连接关闭了...";
}
socket.onmessage = function(ev){ // 相当于消息监听器,可以接收服务器端回送的消息
var rt = document.getElementById("responseText");
rt.value = rt.value + '\n' + ev.data;
}
}else{
alert("当前浏览器不支持websocket");
}
// 发送消息到服务器
function send(msg) {
if(!window.socket){
return;
}
if(socket.readyState === WebSocket.OPEN){
// 通过Socket发送消息
socket.send(msg);
}else{
alert("连接没有开启");
}
}
script>
body>
html>