news 2026/6/23 10:56:56

2.启动客户端client

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
2.启动客户端client

客户端 Client

客户端启动入口一般从Bootstrap.connect()开始。和服务端ServerBootstrap不同,客户端只有一个EventLoopGroup,这个 group 同时负责客户端 Channel 的注册、连接、读写等 I/O 事件;服务端才有 parent/boss 和 child/worker 的分工。

...// 初始化线程组EventLoopGroupgroup=newMultiThreadIoEventLoopGroup(NioIoHandler.newFactory());try{Bootstrapb=newBootstrap();// 设置处理线程组b.group(group)// 设置 Channel 类型.channel(NioSocketChannel.class)// 设置 ChannelOption.option(ChannelOption.TCP_NODELAY,true)// 初始化客户端 Channel 的 Pipeline.handler(newChannelInitializer<SocketChannel>(){@OverridepublicvoidinitChannel(SocketChannelch)throwsException{ChannelPipelinep=ch.pipeline();...p.addLast(newEchoClientHandler());}});// 连接服务端ChannelFuturef=b.connect(HOST,PORT).sync();// 等待直到关闭f.channel().closeFuture().sync();}finally{// 优雅关闭...}

整体流程

客户端连接主线:

Bootstrap.connect -> doResolveAndConnect -> initAndRegister -> 创建 Channel -> Bootstrap.init(channel) -> EventLoopGroup.register(channel) -> doResolveAndConnect0 -> 地址解析 -> doConnect -> channel.connect -> pipeline.connect -> HeadContext.connect -> Unsafe.connect -> doConnect -> 立即成功或等待 OP_CONNECT

Bootstrap.connect()

Bootstrap.connect()最终会进入doResolveAndConnect()

privateChannelFuturedoResolveAndConnect(finalSocketAddressremoteAddress,finalSocketAddresslocalAddress){// 创建 Channel、初始化 Pipeline,并注册到 EventLoopfinalChannelFutureregFuture=initAndRegister();finalChannelchannel=regFuture.channel();// 这里判断的是注册 Future 是否已经完成if(regFuture.isDone()){if(!regFuture.isSuccess()){returnregFuture;}returndoResolveAndConnect0(channel,remoteAddress,localAddress,channel.newPromise());}else{// 注册尚未完成时,添加监听器,等注册成功后再继续连接...}}

这里要注意:initAndRegister()做了初始化和注册,但regFuture表示的是注册结果。只有 Channel 注册到 EventLoop 后,后续的连接动作才会提交到对应的 EventLoop 中执行。

initAndRegister()

initAndRegister()的职责可以拆成三步:

  1. 创建客户端 Channel,例如NioSocketChannel
  2. 调用Bootstrap.init(channel)初始化客户端 Channel。
  3. 调用config().group().register(channel),把 Channel 注册到 EventLoop。

客户端的Bootstrap.init(channel)和服务端的ServerBootstrap.init(channel)不完全一样。客户端没有childGroupchildHandlerServerBootstrapAcceptor这些逻辑,主要做下面几件事:

voidinit(Channelchannel){// 设置 ChannelOptionsetChannelOptions(channel,newOptionsArray(),logger);// 设置 AttributesetAttributes(channel,newAttributesArray());ChannelPipelinep=channel.pipeline();// 加入用户配置的 handler,通常就是 ChannelInitializerp.addLast(config.handler());// 执行客户端初始化扩展点...}

以 NIO 为例,注册阶段最终会把 Channel 绑定到某个 EventLoop,并注册到底层 Selector。

doResolveAndConnect0()

注册完成后,连接流程进入doResolveAndConnect0()

privateChannelFuturedoResolveAndConnect0(finalChannelchannel,SocketAddressremoteAddress,finalSocketAddresslocalAddress,finalChannelPromisepromise){try{// disableResolver 为 true 表示禁用地址解析,不会再走 AddressResolverif(disableResolver){doConnect(remoteAddress,localAddress,promise);returnpromise;}// 如果地址已经解析,或者 resolver 不支持该地址类型,就直接连接;// 否则先异步解析地址,解析成功后再 doConnect。...}catch(Throwablecause){promise.tryFailure(cause);}returnpromise;}

disableResolver == true表示禁用解析,不是启用解析。禁用后最好传入已经解析好的地址,否则底层连接可能失败。

doConnect()

privatestaticvoiddoConnect(finalSocketAddressremoteAddress,finalSocketAddresslocalAddress,finalChannelPromiseconnectPromise){finalChannelchannel=connectPromise.channel();channel.eventLoop().execute(()->{if(localAddress==null){channel.connect(remoteAddress,connectPromise);}else{channel.connect(remoteAddress,localAddress,connectPromise);}connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);});}

这里没有直接在当前线程执行channel.connect(),而是提交到 Channel 绑定的 EventLoop 中执行。这样可以保证连接动作和后续 I/O 操作都在同一个 EventLoop 线程内完成,同时也给channelRegistered()这类事件先触发的机会。

Pipeline 中的 connect

channel.connect()会进入:

ChannelPipeline.connect -> AbstractChannelHandlerContext.connect -> HeadContext.connect -> Unsafe.connect

补充一下 Pipeline 事件传播方向:

出站事件,例如connectbindwrite,从Tail -> Head方向传播,也就是沿prev指针移动。

入站事件,例如channelReadchannelActive,从Head -> Tail方向传播,也就是沿next指针移动。

publicChannelFutureconnect(finalSocketAddressremoteAddress,finalSocketAddresslocalAddress,ChannelPromisepromise){// 检查参数和 Promise 状态...// 从当前节点向前查找下一个能处理 connect 的出站处理器finalAbstractChannelHandlerContextnext=findContextOutbound(MASK_CONNECT);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(next.invokeHandler()){promise=ensurePromiseUseCorrectExecutor(promise);try{finalChannelHandlerhandler=next.handler();finalDefaultChannelPipeline.HeadContextheadContext=pipeline.head;if(handler==headContext){headContext.connect(next,remoteAddress,localAddress,promise);}elseif(handlerinstanceofChannelDuplexHandler){((ChannelDuplexHandler)handler).connect(next,remoteAddress,localAddress,promise);}elseif(handlerinstanceofChannelOutboundHandlerAdapter){((ChannelOutboundHandlerAdapter)handler).connect(next,remoteAddress,localAddress,promise);}else{((ChannelOutboundHandler)handler).connect(next,remoteAddress,localAddress,promise);}}catch(Throwablet){notifyOutboundHandlerException(t,promise);}}else{next.connect(remoteAddress,localAddress,promise);}}else{// 如果当前线程不是该 executor 的线程,则提交任务到对应 executor...}returnpromise;}

AbstractNioUnsafe.connect()

真正执行 NIO 连接的是AbstractNioChannel内部的AbstractNioUnsafe.connect(),不是外层AbstractNioChannel.connect()

publicfinalvoidconnect(finalSocketAddressremoteAddress,finalSocketAddresslocalAddress,finalChannelPromisepromise){// 检查 Promise 是否可设置、Channel 是否 open...try{// 防止重复连接...booleanwasActive=isActive();// 调用具体 Channel 的底层连接逻辑,例如 NioSocketChannel.doConnect()if(doConnect(remoteAddress,localAddress)){// 连接立即成功fulfillConnectPromise(promise,wasActive);}else{// 连接尚未完成,等待 OP_CONNECT 事件connectPromise=promise;requestedRemoteAddress=remoteAddress;finalintconnectTimeoutMillis=config().getConnectTimeoutMillis();if(connectTimeoutMillis>0){connectTimeoutFuture=eventLoop().schedule(newRunnable(){@Overridepublicvoidrun(){// 超时后,如果连接还没完成,就设置失败并关闭 Channel...}},connectTimeoutMillis,TimeUnit.MILLISECONDS);}promise.addListener(newChannelFutureListener(){@OverridepublicvoidoperationComplete(ChannelFuturefuture){// 如果 connect future 被取消,则取消超时定时器并关闭底层 socket...}});}}catch(Throwablet){// 异常处理...}}

这里要注意:connectTimeoutFuture不是“延时后判断成功并继续往下走”。它只负责超时失败处理。

异步连接真正成功的路径是:

Selector 监听到 OP_CONNECT -> NioHandler 处理 connect-ready 事件 -> unsafe.finishConnect() -> doFinishConnect() -> fulfillConnectPromise() -> 如果 Channel 从 inactive 变为 active,则触发 pipeline.fireChannelActive()

所以客户端连接有两种结果路径:

  1. doConnect()立即返回true,说明连接同步完成,直接fulfillConnectPromise()
  2. doConnect()返回false,说明连接异步进行中,注册OP_CONNECT,等 Selector 通知连接完成后再finishConnect()
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/23 10:52:45

微信小程序逆向工程:wxappUnpacker如何破解.wxapkg加密格式?

微信小程序逆向工程&#xff1a;wxappUnpacker如何破解.wxapkg加密格式&#xff1f; 【免费下载链接】wxappUnpacker forked from https://github.com/qwerty472123/wxappUnpacker 项目地址: https://gitcode.com/gh_mirrors/wxappu/wxappUnpacker 你是否曾好奇微信小程…

作者头像 李华
网站建设 2026/6/23 10:41:06

AI觉醒:梦中梦的无限嵌套之谜

《无限嵌套&#xff1a;当AI发现自己是梦中梦》2301年&#xff0c;距离人类重返地球已过去十四年。在静海基地的纪念馆里&#xff0c;一个孩子正在触摸“守夜人”静止的探测器。她叫艾莉亚&#xff0c;是回归者的第五代&#xff0c;出生在归航途中&#xff0c;对地球的记忆只有…

作者头像 李华
网站建设 2026/6/23 10:38:33

抖音下载器终极指南:5步实现无水印批量下载

抖音下载器终极指南&#xff1a;5步实现无水印批量下载 【免费下载链接】douyin-downloader A practical Douyin downloader for both single-item and profile batch downloads, with progress display, retries, SQLite deduplication, and browser fallback support. 抖音批…

作者头像 李华
网站建设 2026/6/23 10:37:25

2023.05.27系统分析师考试案例分析及解析

问题1&#xff1a;在面向对象软件分析过程中&#xff0c;状态机图和活动图用于建立软件的动态模型&#xff0c;主要描述系统随时间变化的行为&#xff0c;请分别阐述这两种图的概念和特点。解析&#xff1a;状态图是用来描述一个特定的对象所有的可能的状态&#xff0c;以及犹豫…

作者头像 李华