IO流模块:经常看、经常用、经常忘;
在IO流的网络模型中,以常见的「客户端-服务端」交互场景为例;
【资料图】
客户端与服务端进行通信「交互」,可能是同步或者异步,服务端进行「流」处理时,可能是阻塞或者非阻塞模式,当然也有自定义的业务流程需要执行,从处理逻辑看就是「读取数据-业务执行-应答写数据」的形式;
Java提供「三种」IO网络编程模型,即:「BIO同步阻塞」、「NIO同步非阻塞」、「AIO异步非阻塞」;
BIO即同步阻塞,服务端收到客户端的请求时,会启动一个线程处理,「交互」会阻塞直到整个流程结束;
这种模式如果在高并发且流程复杂耗时的场景下,客户端的请求响应会存在严重的性能问题,并且占用过多资源;
【服务端】启动ServerSocket接收客户端的请求,经过一系列逻辑之后,向客户端发送消息,注意这里线程的10秒休眠;
publicclassSocketServer01{publicstaticvoidmain(String[]args)throwsException{//1、创建Socket服务端ServerSocketserverSocket=newServerSocket(8080);//2、方法阻塞等待,直到有客户端连接Socketsocket=serverSocket.accept();//3、输入流,输出流InputStreaminStream=socket.getInputStream();OutputStreamoutStream=socket.getOutputStream();//4、数据接收和响应intreadLen=0;byte[]buf=newbyte[1024];if((readLen=inStream.read(buf))!=-1){//接收数据StringreadVar=newString(buf,0,readLen);System.out.println(\"readVar=======\"+readVar);}//响应数据Thread.sleep(10000);outStream.write(\"sever-8080-write;\".getBytes());//5、资源关闭IoClose.ioClose(outStream,inStream,socket,serverSocket);}}
【客户端】Socket连接,先向ServerSocket发送请求,再接收其响应,由于Server端模拟耗时,Client处于长时间阻塞状态;
publicclassSocketClient01{publicstaticvoidmain(String[]args)throwsException{//1、创建Socket客户端Socketsocket=newSocket(InetAddress.getLocalHost(),8080);//2、输入流,输出流OutputStreamoutStream=socket.getOutputStream();InputStreaminStream=socket.getInputStream();//3、数据发送和响应接收//发送数据outStream.write(\"client-hello\".getBytes());//接收数据intreadLen=0;byte[]buf=newbyte[1024];if((readLen=inStream.read(buf))!=-1){StringreadVar=newString(buf,0,readLen);System.out.println(\"readVar=======\"+readVar);}//4、资源关闭IoClose.ioClose(inStream,outStream,socket);}}
NIO即同步非阻塞,服务端可以实现一个线程,处理多个客户端请求连接,服务端的并发能力得到极大的提升;
这种模式下客户端的请求连接都会注册到Selector多路复用器上,多路复用器会进行轮询,对请求连接的IO流进行处理;
【服务端】单线程可以处理多个客户端请求,通过轮询多路复用器查看是否有IO请求;
publicclassSocketServer01{publicstaticvoidmain(String[]args)throwsException{try{//启动服务开启监听ServerSocketChannelsocketChannel=ServerSocketChannel.open();socketChannel.socket().bind(newInetSocketAddress(\"127.0.0.1\",8989));//设置非阻塞,接受客户端socketChannel.configureBlocking(false);//打开多路复用器Selectorselector=Selector.open();//服务端Socket注册到多路复用器,指定兴趣事件socketChannel.register(selector,SelectionKey.OP_ACCEPT);//多路复用器轮询ByteBufferbuffer=ByteBuffer.allocateDirect(1024);while(selector.select()>0){SetselectionKeys=selector.selectedKeys();IteratorselectionKeyIter=selectionKeys.iterator();while(selectionKeyIter.hasNext()){SelectionKeyselectionKey=selectionKeyIter.next();selectionKeyIter.remove();if(selectionKey.isAcceptable()){//接受新的连接SocketChannelclient=socketChannel.accept();//设置读非阻塞client.configureBlocking(false);//注册到多路复用器client.register(selector,SelectionKey.OP_READ);}elseif(selectionKey.isReadable()){//通道可读SocketChannelclient=(SocketChannel)selectionKey.channel();intlen=client.read(buffer);if(len>0){buffer.flip();byte[]readArr=newbyte[buffer.limit()];buffer.get(readArr);System.out.println(client.socket().getPort()+\"端口数据:\"+newString(readArr));buffer.clear();}}}}}catch(Exceptione){e.printStackTrace();}}}
【客户端】每隔3秒持续的向通道内写数据,服务端通过轮询多路复用器,持续的读取数据;
publicclassSocketClient01{publicstaticvoidmain(String[]args)throwsException{try{//连接服务端SocketChannelsocketChannel=SocketChannel.open();socketChannel.connect(newInetSocketAddress(\"127.0.0.1\",8989));ByteBufferwriteBuffer=ByteBuffer.allocate(1024);StringconVar=\"client-hello\";writeBuffer.put(conVar.getBytes());writeBuffer.flip();//每隔3S发送一次数据while(true){Thread.sleep(3000);writeBuffer.rewind();socketChannel.write(writeBuffer);writeBuffer.clear();}}catch(Exceptione){e.printStackTrace();}}}
AIO即异步非阻塞,对于通道内数据的「读」和「写」动作,都是采用异步的模式,对于性能的提升是巨大的;
这与常规的第三方对接模式很相似,本地服务在请求第三方服务时,请求过程耗时很大,会异步执行,第三方第一次回调,确认请求可以被执行;第二次回调则是推送处理结果,这种思想在处理复杂问题时,可以很大程度的提高性能,节省资源:
【服务端】各种「accept」、「read」、「write」动作是异步,通过Future来获取计算的结果;
publicclassSocketServer01{publicstaticvoidmain(String[]args)throwsException{//启动服务开启监听AsynchronousServerSocketChannelsocketChannel=AsynchronousServerSocketChannel.open();socketChannel.bind(newInetSocketAddress(\"127.0.0.1\",8989));//指定30秒内获取客户端连接,否则超时FutureacceptFuture=socketChannel.accept();AsynchronousSocketChannelasyChannel=acceptFuture.get(30,TimeUnit.SECONDS);if(asyChannel!=null&&asyChannel.isOpen()){//读数据ByteBufferinBuffer=ByteBuffer.allocate(1024);FuturereadResult=asyChannel.read(inBuffer);readResult.get();System.out.println(\"read:\"+newString(inBuffer.array()));//写数据inBuffer.flip();FuturewriteResult=asyChannel.write(ByteBuffer.wrap(\"server-hello\".getBytes()));writeResult.get();}//关闭资源asyChannel.close();}}
【客户端】相关「connect」、「read」、「write」方法调用是异步的,通过Future来获取计算的结果;
publicclassSocketClient01{publicstaticvoidmain(String[]args)throwsException{//连接服务端AsynchronousSocketChannelsocketChannel=AsynchronousSocketChannel.open();Futureresult=socketChannel.connect(newInetSocketAddress(\"127.0.0.1\",8989));result.get();//写数据StringconVar=\"client-hello\";ByteBufferreqBuffer=ByteBuffer.wrap(conVar.getBytes());FuturewriteFuture=socketChannel.write(reqBuffer);writeFuture.get();//读数据ByteBufferinBuffer=ByteBuffer.allocate(1024);FuturereadFuture=socketChannel.read(inBuffer);readFuture.get();System.out.println(\"read:\"+newString(inBuffer.array()));//关闭资源socketChannel.close();}}
这部分内容,可以参考「Doug Lea的《IO》」文档,查看更多细节;
Reactor模式基于事件驱动设计,也称为「反应器」模式或者「分发者」模式;服务端收到多个客户端请求后,会将请求分派给对应的线程处理;
Reactor:负责事件的监听和分发;Handler:负责处理事件,核心逻辑「read读」、「decode解码」、「compute业务计算」、「encode编码」、「send应答数据」;
【1】Reactor线程通过select监听客户端的请求事件,收到事件后通过Dispatch进行分发;
【2】如果是建立连接请求事件,Acceptor通过「accept」方法获取连接,并创建一个Handler对象来处理后续业务;
【3】如果不是连接请求事件,则Reactor会将该事件交由当前连接的Handler来处理;
【4】在Handler中,会完成相应的业务流程;
这种模式将所有逻辑「连接、读写、业务」放在一个线程中处理,避免多线程的通信,资源竞争等问题,但是存在明显的并发和性能问题;
【1】Reactor线程通过select监听客户端的请求事件,收到事件后通过Dispatch进行分发;
【2】如果是建立连接请求事件,Acceptor通过「accept」方法获取连接,并创建一个Handler对象来处理后续业务;
【3】如果不是连接请求事件,则Reactor会将该事件交由当前连接的Handler来处理;
【4】在Handler中,只负责事件响应不处理具体业务,将数据发送给Worker线程池来处理;
【5】Worker线程池会分配具体的线程来处理业务,最后把结果返回给Handler做响应;
这种模式将业务从Reactor单线程分离处理,可以让其更专注于事件的分发和调度,Handler使用多线程也充分的利用cpu的处理能力,导致逻辑变的更加复杂,Reactor单线程依旧存在高并发的性能问题;
【1】 MainReactor主线程通过select监听客户端的请求事件,收到事件后通过Dispatch进行分发;
【2】如果是建立连接请求事件,Acceptor通过「accept」方法获取连接,之后MainReactor将连接分配给SubReactor;
【3】如果不是连接请求事件,则MainReactor将连接分配给SubReactor,SubReactor调用当前连接的Handler来处理;
【4】在Handler中,只负责事件响应不处理具体业务,将数据发送给Worker线程池来处理;
【5】Worker线程池会分配具体的线程来处理业务,最后把结果返回给Handler做响应;
这种模式Reactor线程分工明确,MainReactor负责接收新的请求连接,SubReactor负责后续的交互业务,适应于高并发的处理场景,是Netty组件通信框架的所采用的模式;
【服务端】提供两个EventLoopGroup,「ParentGroup」主要是用来接收客户端的请求连接,真正的处理是转交给「ChildGroup」执行,即Reactor多线程模型;
@Slf4jpublicclassNettyServer{publicstaticvoidmain(String[]args){//EventLoop组,处理事件和IOEventLoopGroupparentGroup=newNioEventLoopGroup();EventLoopGroupchildGroup=newNioEventLoopGroup();try{//服务端启动引导类ServerBootstrapserverBootstrap=newServerBootstrap();serverBootstrap.group(parentGroup,childGroup).channel(NioServerSocketChannel.class).childHandler(newServerChannelInit());//异步IO的结果ChannelFuturechannelFuture=serverBootstrap.bind(8989).sync();channelFuture.channel().closeFuture().sync();}catch(Exceptione){e.printStackTrace();}finally{parentGroup.shutdownGracefully();childGroup.shutdownGracefully();}}}classServerChannelInitextendsChannelInitializer{@OverrideprotectedvoidinitChannel(SocketChannelsocketChannel){//获取管道ChannelPipelinepipeline=socketChannel.pipeline();//编码、解码器pipeline.addLast(newStringDecoder(CharsetUtil.UTF_8));pipeline.addLast(newStringEncoder(CharsetUtil.UTF_8));//添加自定义的handlerpipeline.addLast(\"serverHandler\",newServerHandler());}}classServerHandlerextendsChannelInboundHandlerAdapter{/***通道读和写*/@OverridepublicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{System.out.println(\"Server-Msg【\"+msg+\"】\");TimeUnit.MILLISECONDS.sleep(2000);StringnowTime=DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN);ctx.channel().writeAndFlush(\"hello-client;time:\"+nowTime);ctx.fireChannelActive();}@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{cause.printStackTrace();ctx.close();}}
【客户端】通过Bootstrap类,与服务器建立连接,服务端通过ServerBootstrap启动服务,绑定在8989
端口,然后服务端和客户端进行通信;
publicclassNettyClient{publicstaticvoidmain(String[]args){//EventLoop处理事件和IONioEventLoopGroupeventLoopGroup=newNioEventLoopGroup();try{//客户端通道引导Bootstrapbootstrap=newBootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(newClientChannelInit());//异步IO的结果ChannelFuturechannelFuture=bootstrap.connect(\"localhost\",8989).sync();channelFuture.channel().closeFuture().sync();}catch(Exceptione){e.printStackTrace();}finally{eventLoopGroup.shutdownGracefully();}}}classClientChannelInitextendsChannelInitializer{@OverrideprotectedvoidinitChannel(SocketChannelsocketChannel){//获取管道ChannelPipelinepipeline=socketChannel.pipeline();//编码、解码器pipeline.addLast(newStringDecoder(CharsetUtil.UTF_8));pipeline.addLast(newStringEncoder(CharsetUtil.UTF_8));//添加自定义的handlerpipeline.addLast(\"clientHandler\",newClientHandler());}}classClientHandlerextendsChannelInboundHandlerAdapter{/***通道读和写*/@OverridepublicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{System.out.println(\"Client-Msg【\"+msg+\"】\");TimeUnit.MILLISECONDS.sleep(2000);StringnowTime=DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN);ctx.channel().writeAndFlush(\"hello-server;time:\"+nowTime);}@OverridepublicvoidchannelActive(ChannelHandlerContextctx)throwsException{ctx.channel().writeAndFlush(\"channel...active\");}@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{cause.printStackTrace();ctx.close();}}
编者按: 每一个努力生活的中国人,都是最美的奋斗者。也正是因为亿万奋斗者,才有了今日之中国。十年,致敬每一个奋斗的你。让我们一起,踔
中新网永州5月23日电 (谢助民 唐毅刚)每天上午,在湖南东安县中天职业培训学校教室,炒菜师傅唐远江都准时教学员做东安鸡。红椒丝、姜丝
中新网福州5月23日电 (郑江洛)福建省公安厅出入境管理局局长郑荫源23日在此间披露,今年以来,福建共侦破妨害国(边)境管理犯罪案件378起,
中新网福州5月23日电 (郑江洛)“2017年至2021年,全省社会治安呈现‘两降三升’良好态势。”福建省公安厅党委委员、副厅长黄华安23日在此
中新网湖州5月23日电(施紫楠 施妍 陈思谊)“等到疫情好了,我们就去找你……”23日,浙江省湖州市长兴县洪桥镇弁山村村民郑益谋一家,又
X 关闭
X 关闭