客户端接入后,下面一步操作就是读取客户端传输过来的数据,这一节我们就来分析下服务端读取客户端数据流程。从前面分析来看,channel
的事件轮询、事件处理是在NioEventLoop
的run
方法中,从这里我们就很容易找我服务端读流程的入口方法:processSelectedKeys()
。
从processSelectedKeys()
一直追踪下去,可以看到OP_READ
处理逻辑分支:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();}
可能你会比较奇怪:为什么OP_READ
和OP_ACCEPT
都会走这个分支?
【资料图】
OP_ACCEPT
是NioServerSocketChannel
处理的事件,而OP_READ
是NioSocketChannel
处理的事件,所以,虽然它们都走这个分支,但是channel类型确是不一样的,即这里的unsafe
类型也不一样,一个是:NioMessageUnsafe
,另一个是:NioSocketChannelUnsafe
。NioServerSocketChannel
负责监听客户端连接,当有客户端连接进入时,对它来说就是有个读入消息需要被处理。这里我们是处理client channle
的OP_READ
,所以,unsafe
是NioSocketChannelUnsafe
类型实例。
AbstractNioByteChannel.NioByteUnsafe#read
方法代码如下:
public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 申请ByteBuf对象 byteBuf = allocHandle.allocate(allocator); //doReadBytes(byteBuf):将数据读取到ByteBuf中 //lastBytesRead()将读取的字节数设置到lastBytesRead allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; //触发pipeline channelRead事件,将读入数据ByteBuf传入到handler中 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading());//判断是否继续读取 allocHandle.readComplete(); //触发pipeline channelReadComplete pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } }}
这个方法刨除其它逻辑,关于客户端数据处理逻辑主要包括3个步骤:
allocHandle.lastBytesRead(doReadBytes(byteBuf))
:调用java api
,从channel
中读取字节数据到ByteBuf
缓存中;pipeline.fireChannelRead(byteBuf)
:触发pipeline
的channelRead
事件,并将带有读入数据的ByteBuf
通过参数传入;pipeline.fireChannelReadComplete()
:触发pipeline
的channelReadComplete
事件;调用pipeline
的fireChannelRead()
就可触发channelRead
事件在handler
之间传播,事件传播这块代码比较绕,给人感觉不停的来回调用容易绕晕,下面通过图可以更加直观的看出调用流程,再配合代码就很好理解了。
关键点就在于HandlerContext
中提供了一个静态方法:invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)
,第一个是在哪个handler
上触发事件,第二个参数就是数据本身,通过这个方法就可以指定在哪个handler
上触发channelRead
事件。由于pipeline
中的handler
是被包装成HandlerContext
放入的,所以,可以通过handler()
方法找到真正的handler
对象进行触发。
比如pipeline
的fireChannelRead()
就是触发head
的channelRead
事件,如果处理完成需要把事件继续传播给下一个handler
,就需要调用ctx.fireChannelRead(msg)
方法即可,该方法中通过next
属性获取到下一个节点,然后执行static invokeChannelRead(next, msg)
这个方法就可以将事件传播到下一个节点上。
pipeline.fireChannelRead(byteBuf)
运行完成后会调用pipeline.fireChannelReadComplete()
方法,触发channelReadComplete
事件,执行机制和channelRead
事件一样,就不再赘述。
搞清楚上面原理,就很容易理解
ctx.fireChannelRead()
和ctx.pipeline().fireChannelRead()
之间的区别了,避免误用。
上面分析的都是常规模式,没有给handler
指定额外线程情况下channelRead
和channelReadComplete
传播机制,大致如下图:
先触发channelRead
事件,按照pipeline
中顺序依次触发,当所有handler
都触发完后,再触发channelReadComplete
事件,按照pipeline
中的顺序依次触发。这些所有流程采用的都是同步方式,在同一个线程中执行,这个线程就是channel
注册的NioEventLoop
。
我们来看下static void invokeChannelRead()
这个方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); }}
在执行next.invokeChannelRead(m)
方法前有个executor.inEventLoop()
判断,判断当前执行线程是不是就是handler
执行所需的线程。执行handler
方法是不能随便线程都可以去执行的,必须使用handler
内部指定的executor
线程执行器中执行才行。如下图,也就是说红色框框中的内容必须在executor
线程执行器中执行,如果当前线程和handler
执行线程不是同一个,就需要进行线程切换:则调用封装成一个任务,提交到executor
的任务队列中让其执行。
executor
线程执行器是通过next.executor()
方法获取到的,从这个方法源码中可以看到获取逻辑:如果HandlerContext
中executor
有值则直接返回;否则返回channel
注册的NioEventLoop
作为线程执行器。
在添加handler
时可以指定一个EventGroup
:pipeline.addLast( bizGroup, "handler2", new OtherTest02());
,这样,再把handler
包装成HandlerContext
过程中会从这个EventGroup
根据chooser
选取策略获得一个EventLoop
赋值给executor
。
所以,从上面分析,默认情况下handler
都是在channel
注册的NioEventLoop
线程中执行的,除非在addLast
添加handloer
时特别指定。
下面我们通过一个案例分析下pipeline
线程模型,如下,给handler02
添加一个额外的线程池:
EventLoopGroup bizGroup = new NioEventLoopGroup(10, new ThreadFactoryBuilder().setNameFormat("biz-%s").build());protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast( "handler01", new OtherTest01()); pipeline.addLast( bizGroup, "handler02", new OtherTest02()); pipeline.addLast( "handler03", new OtherTest03());}
这时,channelRead
和channelReadComplete
事件触发流程见下图:
channelRead
事件执行流程说明:
handler01
的channelRead
事件,本身当前线程和handler01
是同一个线程,所以,直接调用handler#channelRead()
方法;handler01#channelRead()
方法执行完成后,事件继续向下传播,需要调用handler02#channelRead()
方法,但是handler02
执行线程并不是默认的channel
的注册线程,而是额外设置的biz
线程,需要将调用包装成一个任务提交到biz
线程的任务队列taskQueue
中,然后直接返回;biz线程执行器内部线程会一直循环从taskQueue
中获取任务执行,这样就完成了线程切换效果;当handler02#channelRead()
方法执行完成后,需要执行handler03#channelRead()
,它们又不在同一个线程中执行,这时有需要切换线程,所以会把handler03#channelRead()
的调用封装成一个任务提交到register eventLoop的taskQueue
中,待其内部线程提取执行;下面再来看下channelReadComplete
事件执行流程:
a1
将任务提交给taskQueue
任务队列后直接返回了,而不是等其执行完成再返回;a1
返回后,从源码分析来看,会立即触发channelReadComplete
事件,涉及到线程切换,同理b1
这里也是将handler02#channelReadComplete()
调用封装成任务放入到biz eventLoop
的taskQueue
中的,然后也直接返回了;这样,biz eventLoop
线程执行器taskQueue
中就有两个任务,会按照顺序依次执行:先执行channelRead()
调用,再执行channelReadComplete()
调用;执行a3、b3
时同理;从上面可以看出,Pipeline
中handler
可以在不同线程间切换得到关键是:taskQueue
。还要一点非常重要:handler
线程池执行器默认使用的channel
注册的NioEventLoop
这个,NioEventLoop
采用的是单线程工作模式,同时还需要处理selector.select()
事件轮询,所以,handler
里肯定不能有耗时、特别是IO
阻塞等操作,不然卡在handler
中,selector#select()
执行不到,无法及时接收到客户端传送过来的数据。
标签:
客户端接入后,下面一步操作就是读取客户端传输过来的数据,这一节我们就来分析下服务端读取客户端数据流程。从前面分析来看,channel的事件轮
“当前的(防疫)形势正在发生变化,我们会调整产品的定位,把它们作为常态化产品对待。”28日,在复星医药2022年业绩媒体沟通会上,复星医药...
如果您看的是最近一两年拍摄的电影、电视剧,会发现视频不仅仅是4K带HDR的,音轨也有变化,不再是简单的2 0两个声道,越来越多的视频开始支持
1、1 下载安装谷歌chrome浏览器2 打开浏览器,打开youku网,点击你要提取的视频3 右击鼠标。2、选择【审核元
宗喀巴大师讲“无显不入道,无密不成佛”。我们修行的次第就是在宗喀巴大师所阐述的出离心、菩提心、见空智慧的三主要道基础上进入密法,然...
我是故小陶,点击右上方“关注”,每天为你分享电影与科技的热门动态絮絮叨叨:如果说最近哪部韩剧最火,那必定是黑暗荣耀,尤其是强势上线...
在混双上输了,但是在男双和男单的比赛中,他都拿到了最后的金牌,着实让球迷们兴奋不已。我们希望樊振东在未来能够取得更大的成就,因为目前
非郑州市户口可以在郑州办无犯罪证明吗?非郑州市户籍人员可到现居住地的派出所进行开具无犯罪记录证明。非郑州市户籍人员无法通过“郑州警...
3月24日,“存量新生改造破局”东呈集团柏曼酒店投资品鉴会·华南站圆满举办。本次投资品鉴会主要聚焦酒旅市场现状,研读存量改造赛道投资...
近期天气适宜适合外出赏花我市樱桃花最佳观赏期持续至4月1日
乐居财经王敏3月24日,潍柴雷沃智慧农业科技股份有限公司(以下简称“潍柴雷沃”)发布首次公开发行股票并在创业板上市
3月26日下午,由宝鸡市政府、吉利汽车集团主办,宝鸡高新区管委会、宝鸡吉利汽车部件有限公司承办的吉利银河供应商大会暨宝鸡
22进出07(增20)发布发行公告
齐鲁网·闪电新闻3月28日讯刘楠楠,38岁,潍坊寿光市公安局城区派出所石马社区警务室社区民警。自2017年担任社区民警
上调公积金贷款额度至百万、首套房公积金贷款首付比例两成、支持公积金支付首付款,近期以来,多地在原有政策基础上进一步优化住房公积金政策
近期在各大短视频平台上都有关于“所有系统全部启动启动启动”的玩梗热潮,据了解,这句话本是动画片《超级飞侠》里的经典台词,那为何会被...
强对流天气:目前是指伴随雷暴现象的对流性大风(≥17 2m s)、冰雹、短时强降水三、灾害防御建议1 注意防范雷电可能造成的人员伤亡、设备损失
升朝阳之旗,燃青春之我3月27日早晨,寿光市世纪学校(小学)全体师生相聚在美丽的绿茵场上,迎着晨曦举行“规范日常行为,争做文明学生”...
王牌战机,关于王牌战机介绍这个很多人还不知道,我们一起来看看!1、《王牌战机》是一款2D单人射击类游戏。2、游戏大小为8 4M。关于王牌战机到
1、在这里我以AwardBIOS为例介绍一下具体步骤锁电脑的密码方法:1 开机--按Del键--进入BIOS2
一、43岁一年交12000退休拿多少一个月差不多是1500元左右,不过具体的也要看各个地区的经济情况,地区经济越好,在领取养老金的时候,可以领到
今天,有不少抖音博主称收到了官方通知,因系统升级将下架课程类虚拟商品,已购买的课程还能正常学习。因为以前抖音上课程商品走“小黄车”...
最新一代RedmiNote12的上市导致前辈的价格下跌。因此,如果您正在寻找一款高达200欧元(目前价格要低得多
1、呵呵,这个好说,下赛季的英超要等到八月中旬才开打。2、可以先看看以前的录像,网页上搜索直播吧就好了,里面的场次比较全
关于请求严格督促整治衡阳市砂石非法销售乱象的报告投诉直通车是湖南日报、华声在线、新湖南主办的投诉维权类栏目,帮助解决网上投诉,315消费投
奥地利4-1大胜阿塞拜疆后,奥地利主教练朗尼克在接受Laola1采访时谈到了萨比策,并表示来曼联后萨比策变得更自信了。朗尼克说:“萨比策现在比
1、德州尧鼎光电科技有限公司于2014年04月24日成立。2、法定代表人吴锜,公司经营范围包括:光电子器件及配件、光
抚养权是指父母对其子女的一项人身权利,抚养有婚生的抚养与非婚生的抚养之分,在现实生活中由于各种原因的出现与发生,导致父母
北京商报讯(记者赵博宇)3月27日,据北京市教委网站消息,北京市教委发布《关于做好2023年高级中等学校考试招生工作的意见》(以下简称《意见
Copyright © 2015-2022 东方服装网版权所有 备案号:沪ICP备2020036824号-8 联系邮箱:562 66 29@qq.com