Netty快速入门实例-TCP服务
1、实例要求:使用IDEA创建Netty项目;
2、Netty服务器在6668端口监听,客户端能发送消息给服务器“hello,服务器”;
3、服务器可以回复消息给客户端“hello,客户端”;
4、目的:对Netty线程模型有一个初步认识,便于理解Netty模型理论;
服务器端
public class NettyServer {
public static void main(String[] args) throws Exception {
/**
* 说明
* 1、创建两个线程组bossGroup 和 workerGroup
* 2、bossGroup只是处理连接请求,真正和客户端业务处理的会交给workerGroup完成
* 3、两个都是无限循环
*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来进行设置
//设置两个线程组
ServerBootstrap bootstrap1 = bootstrap.group(bossGroup, workerGroup)
//使用NioServerSocketChannel作为服务器的通道实现
.channel(NioServerSocketChannel.class)
//设置线程队列得到的连接个数
.option(ChannelOption.SO_BACKLOG, 128)
//设置保持获得的连接状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
//创建一个通道测试对象(匿名对象)
//给我们的workerGroup的EventLoop对应的管道设置处理器
.childHandler(new ChannelInitializer() {
//给pipeline设置处理器
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("......服务器 is ready...");
//绑定一个端口并且同步,生成了一个ChannelFuture对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
//优雅关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务器端业务处理器
/**
* 1、我们自定义一个Handler,需要继承netty规定好的某个handlerAdapter(规范)
* 2、这时我们自定义一个Handler,才能称为一个Handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取数据(这里我们可以读取客户端发送的消息)
* @param ctx 上下文对象,含有管道pipeline,通道channel,地址
* @param msg 就是客户端发送的数据,默认是Object
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx = "+ctx);
//将msg转成一个ByteBuf
//ByteBuf 是Netty提供的,不是NIO的ByteBuffer
ByteBuf buf = (ByteBuf)msg;
System.out.println("客户端发送的消息是:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:"+ctx.channel().remoteAddress());
}
/**
* 数据读取完毕
* @param ctx
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
/**
* writeAndFlush 是write+flush
* 将数据写入到缓冲区,并刷新,一般讲,我们对这个发送的数据进行编码
*/
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端",CharsetUtil.UTF_8));
}
/**
* 处理异常,一般需要关闭通道
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端
public class NettyClient {
public static void main(String[] args) {
//客户端需要一个事件循环组
NioEventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象,注意客户端使用不是ServerBootstarp而是Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
//设置线程组
bootstrap.group(group)
//设置客户端通道的实现类(反射)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//加入自己的处理器
sc.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("客户端 ok...");
//启动客户端去连接服务器端
//关于ChannelFuture要分析,涉及到netty的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
客户端业务处理器
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当通道就绪就会触发该方法
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client "+ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,服务端 ", CharsetUtil.UTF_8));
}
/**
*当通道有读取事件时会触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:"+ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Netty快速入门实例-HTTP服务
1、Netty服务器在6668端口监听,浏览器发出请求“http://localhost:6668”;
2、服务器可以回复消息给客户端“Hello 我是服务器5”,并对特定请求资源进行过滤;
3、目的:Netty可以做Http服务开发,并且理解Handler实例和客户端及其请求的关系;
服务端代码
/**
* 服务端
*/
public class TestServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).
childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
TestServerInitializer代码
public class TestServerInitializer extends ChannelInitializer {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//向管道加入处理器
//得到管道
ChannelPipeline pipeline = ch.pipeline();
//加入一个netty提供的httpServerCodec codec=>[coder - decoder]
//HttpserverCodec是netty提供的处理http的 编-解码器
pipeline.addLast("MyhttpServerCodec",new HttpServerCodec());
//增加一个自定义的handler
pipeline.addLast("MyTestHttpServerHandler",new TestHttpServerHandler());
}
}
业务处理模块TestHttpServerHandler
/**
* 1、SimpleChannelInboundHandler 是 SimpleChannelInboundHandlerAdapter
* 2、HttpObject 客户端和服务端互相通讯的数据被封装成 HttpObject
*/
public class TestHttpServerHandler extends SimpleChannelInboundHandler {
//读取客户端数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
//判断msg 是不是HttpRequest
if(msg instanceof HttpRequest){
//每开一个浏览器都回产生一个新的TestHttpServerHandler,因为http协议用完就关掉
System.out.println("pipeline hashcode "+ctx.pipeline().hashCode()+",TestHttpServerHandler hashcode="+this.hashCode());
System.out.println("msg 类型="+msg.getClass());
System.out.println("客户端地址"+ctx.channel().remoteAddress());
//获取到
HttpRequest httpRequest = (HttpRequest) msg;
//获取uri,过滤指定资源
URI uri = new URI(httpRequest.uri());
if("/favicon.ico".equals(uri.getPath())){
System.out.println("请求了faviconicon,不做响应");
return;
}
//回复信息给浏览器
ByteBuf content = Unpooled.copiedBuffer("hello,我是服务器", CharsetUtil.UTF_8);
//构造一个http的响应,即httpResponse
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain;charset=utf-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH,content.readableBytes());
//将构建好的response返回
ctx.writeAndFlush(response);
}
}
}
测试结果