个人理解:Netty是对socket的进一步封装
public class GdsApplication {
public static void main(String[] args) {
ApplicationContext applicationContext = SpringApplication.run(GdsApplication.class, args);
new JniNettyServer("224.1.2.1", 55198).start();
}
}
@Slf4j
public class JniNettyServer {
private InetSocketAddress groupAddress;
private ChannelFuture f;
public JniNettyServer(String host, int port) {
groupAddress = new InetSocketAddress(host, port);
}
public JniNettyServer() {
}
public boolean isConnect() {
return f.channel().isActive();
}
public void oneChannlStart(int port){
try {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioDatagramChannel.class)
.handler(new JniNettyHander());
ChannelFuture sync = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void start1(){
InetSocketAddress groupAddress = new InetSocketAddress("224.1.1.5", 55198);
System.setProperty("java.net.preferIPv4Stack", "true");
EventLoopGroup group = new NioEventLoopGroup();
try {
NetworkInterface ni = NetUtil.LOOPBACK_IF;
Enumeration<InetAddress> addresses = ni.getInetAddresses();
InetAddress localAddress = null;
while (addresses.hasMoreElements()) {
InetAddress address = addresses.nextElement();
if (address instanceof Inet4Address){
localAddress = address;
}
}
System.out.println(localAddress);
Bootstrap bootstrap = new Bootstrap();
//设置NioDatagramChannel
bootstrap.group(group).channel(NioDatagramChannel.class)
.localAddress(localAddress,groupAddress.getPort())
//设置Option 组播
.option(ChannelOption.IP_MULTICAST_IF, ni)
//设置Option 地址
.option(ChannelOption.IP_MULTICAST_ADDR, localAddress)
//设置地址
.option(ChannelOption.SO_REUSEADDR, true)
.handler(new JniNettyHander());
//获取NioDatagramChannel
NioDatagramChannel channel = (NioDatagramChannel) bootstrap.bind(groupAddress.getPort()).sync().channel();
//加入组
channel.joinGroup(groupAddress, ni).sync();
//关闭Channel
channel.closeFuture().await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();//优雅退出
}
}
public void start() {
EventLoopGroup group = new NioEventLoopGroup();
try {
InetAddress localAddress= OsInfoUtils.getLinuxLocalInetAddress();
NetworkInterface ni = NetworkInterface.getByInetAddress(localAddress);
// System.setProperty("java.net.preferIPv4Stack", "true");
// NetworkInterface ni = NetUtil.LOOPBACK_IF;
// Enumeration<InetAddress> addresses = ni.getInetAddresses();
// InetAddress localAddress = null;
// while (addresses.hasMoreElements()) {
// InetAddress address = addresses.nextElement();
// if (address instanceof Inet4Address){
// localAddress = address;
// }
// }
/*String name = SystemPropManager.getInstance().getSysPropVal("UDP_NI_NAME");
NetworkInterface ni = NetworkInterface.getByName(name);
if(null==ni){
System.out.println("组播【server】NetworkInterface为空。ni:" + name);
return;
}
// NetworkInterface ni = NetUtil.LOOPBACK_IF;
Enumeration<InetAddress> addresses = ni.getInetAddresses();
InetAddress localAddress = null;
while (addresses.hasMoreElements()) {
InetAddress address = addresses.nextElement();
if (address instanceof Inet4Address) {
localAddress = address;
}
}*/
Bootstrap b = new Bootstrap();
b.group(group).channelFactory(new ChannelFactory<NioDatagramChannel>() {
public NioDatagramChannel newChannel() {
return new NioDatagramChannel(InternetProtocolFamily.IPv4);
}
}).option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(2048 * 64))
.localAddress(localAddress, groupAddress.getPort()).option(ChannelOption.IP_MULTICAST_IF, ni)
.option(ChannelOption.SO_REUSEADDR, true).handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
public void initChannel(NioDatagramChannel ch) throws Exception {
ch.pipeline().addLast(new JniNettyHander());
}
});
NioDatagramChannel ch = (NioDatagramChannel) b.bind(groupAddress.getPort()).sync().channel();
ch.joinGroup(groupAddress, ni).sync();
// System.out.println("server");
// ch.closeFuture().await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
// group.shutdownGracefully();
}
}
public static void main(String[] args) {
new JniNettyServer("224.0.0.20",9000).start();
}
}
public class JniNettyHander extends SimpleChannelInboundHandler<DatagramPacket> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket msg) throws Exception {
//业务逻辑 需要推送的消息处理
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
// We don't close the channel because we can keep serving requests.
}
}
@SpringBootApplication
public class SocketApplication {
private static final Logger log = LoggerFactory.getLogger(SocketApplication.class);
public static void main( String[] args ) {
SpringApplication.run(SocketApplication.class,args);
try {
Properties properties = PropertiesLoaderUtils.loadAllProperties("application.properties");
String host = properties.getProperty("netty.host");
String port = properties.getProperty("netty.port");
if (StringUtils.isEmpty(host) || StringUtils.isEmpty(port)) {
log.error("netty地址未配置");
System.exit(0);
}
new NettyService(host, Integer.valueOf(port)).start();
} catch (Exception e) {
log.error("Netty server start error:", e);
System.exit(0);
}
}
}
@Slf4j
public class NettyService {
private String host;
private final int port;
public NettyService(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
// 主线程组
// EventLoopGroup bossGroup = new NioEventLoopGroup();
// 工作线程组 处理读写事件
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端
Bootstrap bootstrap = new Bootstrap();
// 绑定线程池
bootstrap.group(group)
// 指定使用的channel
.channel(NioDatagramChannel.class) //使用 NioSocketChannel 作为客户端的通道实现
.option(ChannelOption.SO_BROADCAST, true)
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1024*1024))
// 绑定监听端口
.localAddress(this.host, this.port)
// 绑定客户端连接时候触发操作
.handler(new UdpServerHandler());
// 服务器异步创建绑定
ChannelFuture cf = bootstrap.bind().sync();
log.info(NettyService.class + "已启动,正在监听: " + cf.channel().localAddress());
// 关闭服务器通道 //异步关闭通道(不是直接关闭,只是监听有关闭行为后在关闭)
cf.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync(); // 释放线程池资源
// bossGroup.shutdownGracefully().sync();
}
}
}
public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private static final Logger log = LoggerFactory.getLogger(UdpServerHandler.class);
private WebSocketServer webSocketServer = SpringContextUtil.getBean(WebSocketServer.class);
@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
String receiveMsg = packet.content().toString(CharsetUtil.UTF_8);
//判断接受到的UDP消息是否正确(未实现)
if (!StringUtils.isEmpty(receiveMsg)) {
// log.info("Received UDP Msg:" + receiveMsg);
webSocketServer.sendInfo(receiveMsg);
} else {
log.error("Received UDP messsage error : null");
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
// We don't close the channel because we can keep serving requests.
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("与判读建立连接");
//添加到channelGroup通道组
// NettyChannelHandlerPool.channelGroup.add(ctx.channel());
// ctx.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
// ctx.pipeline().addLast(new StringDecoder(Charset.forName("GBK")));
NettyChannelComponent.ctx = ctx;
// NettyChannelComponent.clients.add(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.info("与判读断开连接");
//从channelGroup通道组删除
// ChannelHandlerPool.channelGroup.remove(ctx.channel());
NettyChannelComponent.ctx = null;
}
}