`
san_yun
  • 浏览: 2602280 次
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

java socket的理解-从helo world开始

 
阅读更多

之前我以为java的socket很简单,不就是创建一个socketServet,然后不断的accept么?这种代码网上很多,基本流程是这样的:

ServerSocket server = new ServerSocket(8080);
while (true) {
	Socket socket = server.accept();
	InputStream input = socket.getInputStream();
	int length = input.available();
	if(length>0){
		byte[] data = new byte[length];
		input.read(data);
		String str = new String(data);
		String result = execute(str); 
		OutputStream out = socket.getOutputStream();
		out.write(("ok"+result+"\r\n").getBytes());
		out.flush();
	}
       socket.close();
 }

虽然在做轮询,但accept()会阻塞,直到有新的客户端请求过来才会产生一个新的socket,然后读取数据,处理数据,最后关闭。基本流程是 accept()--->read()--->hand()--->close()。

 

代码看上去很简单,也能成功运行,但这段代码存在2个问题:

1. 一个socket只能处理一次就被关闭了,对于局域网的RPC来说这是很不划算的,很多时候我们希望能是长链接。处理流程变成accept()-->read()-->hand()-->read()-->hand()-->close()

2. 如果一个client在一个socket中连续发送两次数据,client端代码会报错,因为socket在处理玩第一次read数据之后就被server关闭了。

 

所以为了解决上面两个问题,我把这段代码修改一下,每个socket一个线程来处理:

  class Channel implements Runnable {

        private Socket socket;
        private ServiceRegisterTester t ;
        public Channel(Socket socket,ServiceRegisterTester t ){
            super();
            this.socket = socket;
            this.t = t;
        }

        public void run() {
            try{
                Socket socket = this.socket;
                while(true){
                    InputStream input = socket.getInputStream();
                    int length = input.available();
                    if(length>0){
                        byte[] data = new byte[length];
                        input.read(data);
                        String str = new String(data);
                        String result = t.test(str);
                        OutputStream out = socket.getOutputStream();
                        out.write(("ok"+result+"\r\n").getBytes());
                        out.flush();
                    }
                }
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    }
ServerSocket server = new ServerSocket(8080);
while (true) {
    Socket socket = server.accept();
    Channel chanenl = new Channel(socket,t);
    new Thread(chanenl).start();
}
 

 这样每个socket都有一个单独的线程来处理,前面2个问题解决了!

但通过top观测发现java进程cpu占用非常高,都在做空转,每个socket都有线程在做轮询,这太伤害性能了吧,怎么才能让

socket.getInputStream();

也阻塞,让他能智能的读取到可用的数据才唤醒呢?

 

 

我相信netty也会遇到这个问题看看netty是如何解决的吧:

        ChannelFactory factory =  
                new OioServerSocketChannelFactory (  
                        Executors.newCachedThreadPool(),  
                        Executors.newCachedThreadPool());  
        
        ServerBootstrap bootstrap = new ServerBootstrap (factory);  
        DiscardServerHandler handler = new DiscardServerHandler();
        ChannelPipeline pipeline = bootstrap.getPipeline();  
        pipeline.addLast("handler", handler);  
        bootstrap.setOption("child.tcpNoDelay", true);  
        bootstrap.setOption("child.keepAlive", true);  
        bootstrap.bind(new InetSocketAddress(8080));  

 ChannelFactory需要两个线程池,一个是boss,一个是worker,boss只需要一个线程即可,worker可以根据合适的情况配置。当在执行 bootstrap.bind()的时候会启动boss线程,代码如下:

class OioServerSocketPipelineSink{

	private void bind(
		        OioServerSocketChannel channel, ChannelFuture future,
		        SocketAddress localAddress) {


	 Executor bossExecutor =
		            ((OioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
		        bossExecutor.execute(
		                new IoWorkerRunnable(
		                        new ThreadRenamingRunnable(
		                                new Boss(channel),
		                                "Old I/O server boss (channelId: " +
		                                channel.getId() + ", " + localAddress + ')')));
		        bossStarted = true;
	}
}
 

OioServerSocketPipelineSink&Boss 是一个Runnable,其run方法如下:

 

 while (channel.isBound()) {
        try {
            Socket acceptedSocket = channel.socket.accept();
            try {
                ChannelPipeline pipeline =
                    channel.getConfig().getPipelineFactory().getPipeline();
                final OioAcceptedSocketChannel acceptedChannel =
                    new OioAcceptedSocketChannel(
                            channel,
                            channel.getFactory(),
                            pipeline,
                            OioServerSocketPipelineSink.this,
                            acceptedSocket);
                workerExecutor.execute(
                        new IoWorkerRunnable(
                                new ThreadRenamingRunnable(
                                        new OioWorker(acceptedChannel),
                                        "Old I/O server worker (parentId: " +
                                        channel.getId() + ", channelId: " +
                                        acceptedChannel.getId() + ", " +
                                        channel.getRemoteAddress() + " => " +
                                        channel.getLocalAddress() + ')')));
            } catch (Exception e) {
                logger.warn(
                        "Failed to initialize an accepted socket.", e);
                try {
                    acceptedSocket.close();
                } catch (IOException e2) {
                    logger.warn(
                            "Failed to close a partially accepted socket.",
                            e2);
                }
            }
  }

 可以看到Boss的职责就是轮询获取每个新请求Socket,立即交给workerExecutor处理,workerExecutor的逻辑单元封装在OioWorker,OioWorker的run方法如下:

public void run(){
        final PushbackInputStream in = channel.getInputStream();
        while (channel.isOpen()) {
            synchronized (channel.interestOpsLock) {
                while (!channel.isReadable()) {
                    try {
                        // notify() is not called at all.
                        // close() and setInterestOps() calls Thread.interrupt()
                        channel.interestOpsLock.wait();
                    } catch (InterruptedException e) {
                        if (!channel.isOpen()) {
                            break;
                        }
                    }
                }
            }

            byte[] buf;
            int readBytes;
            try {
                int bytesToRead = in.available();
                if (bytesToRead > 0) {
                    buf = new byte[bytesToRead];
                    readBytes = in.read(buf);
                } else {
                    int b = in.read();
                    if (b < 0) {
                        break;
                    }
                    in.unread(b);
                    continue;
                }
            } catch (Throwable t) {
                if (!channel.socket.isClosed()) {
                    fireExceptionCaught(channel, t);
                }
                break;
            }

            ChannelBuffer buffer;
            if (readBytes == buf.length) {
                buffer = ChannelBuffers.wrappedBuffer(buf);
            } else {
                // A rare case, but it sometimes happen.
                buffer = ChannelBuffers.wrappedBuffer(buf, 0, readBytes);
            }

            fireMessageReceived(channel, buffer);
        }

        // Setting the workerThread to null will prevent any channel
        // operations from interrupting this thread from now on.
        channel.workerThread = null;

        // Clean up.
        close(channel, succeededFuture(channel));

}

 OioWorker里有一个非常重要的InputStream-PushbackInputStream,这个输入流能阻塞io,请看其read()方法的注释:“从此输入流中读取下一个数据字节。返回 0 到 255 范围内的 int 字节值。如果因流的末尾已到达而没有可用的字节,则返回值 -1。在输入数据可用、检测到流的末尾或者抛出异常前,此方法一直阻塞。”
netty就是利用这种方式来做轮询,而CPU又不至于空转。

 

分享到:
评论

相关推荐

    黑鲨helo-刷写第三方rec资源+步骤 实测支持安卓多版本

    黑鲨helo-刷写第三方rec资源+步骤 实测支持安卓多版本 1----请使用本人实测的资源来刷写你的机型 2----资源可以支持不同安卓版本刷写第三方rec 3-----内含刷写资源和详细刷写教程步骤 4-----完美兼容当前此安卓...

    Hello-Name-LPII:Helo名称LPII

    Hello-Name-LPII Matheus Amilton de Souza

    Helo OS 2019.0.0.2源码.zip

    HELO OS是一个小型的 32 位软盘内核操作系统,支持分页内存管理,图形界面,鼠标,键盘,控制台等驱动,以及一些应用程序。涉及的内容多,但深入不多。系统拥有独立的内核和独立的可执行程序,其可执行文件拓展名为 ...

    helo os v1.0.0.2019.1源码.zip

    HELO OS一个小型的 32 位软盘内核操作系统,支持分页内存管理,图形界面,鼠标,键盘,控制台等驱动,以及一些应用程序深入不多,但涉及内容广。统拥有独立的内核和独立的可执行程序,其可执行文件拓展名为 .HEL 。 ...

    pysmtp:卡彭将军

    usage: pysmtp.py [-h] --lookup-domain LOOKUP_DOMAIN --greeting-domain GREETING_DOMAIN [--linefeed LINEFEED] [--smtp-port PORT] [--no-ip-scan] [--no-dig] [--uses-helo | --uses-elho] Python SMTP utility...

    手机JAVA游戏破解工具

    这是一款手机JAVA游戏破解工具,使用前需安装JAVA软件。安装后可打开HELO

    python基础教程之Hello World!

    &gt;&gt;&gt;print(‘Hello World!’) 可以看到,随后在屏幕上输出: 复制代码 代码如下: Hello World! print是一个常用函数,其功能就是输出括号中得字符串。 (在Python 2.x中,print还可以是一个关键字,可写成print ...

    helo:废弃的smtps服务器库

    #helo丢弃的stmp / s服务器。 helo代表完整的helo 和ehlo部分(扩展规格)。 在发送正确的响应时,它会丢弃所有传入的命令。 用作基准测试服务器。

    r3_helo_

    jsp文件,tomcat实现javaweb文件解析

    计算机网络第六版答案

    Computer Networking: A Top-Down Approach, 6th Edition Solutions to Review Questions and Problems Version Date: May 2012 ...This document contains the solutions to review questions and problems for...

    HELO文字处理.exe

    这是一个简单的文字处理,主要针对较小运行文字处理而编写。 具有丰富的文字处理和表格处理的功能 。 最大特点是小型,欢迎大家下载和使用。

    helo-two:盖茨比学习网站

    盖茨比极简启动器 :rocket: 快速开始创建一个Gatsby网站。 使用Gatsby CLI创建一个新站点,并指定最小的启动器。 # create a new Gatsby site using the minimal starternpm init gatsby 开始开发。 导航到新站点的...

    helo:Simulation-3全栈练习

    该项目是通过。 可用脚本 在项目目录中,可以运行: npm start ... 此命令将从项目中删除单个构建依赖项。 相反,它将所有配置文件和传递依赖项(webpack,Babel,ESLint等)直接复制到您的项目中

    Hello对应的可运行jar文件

    将Hello工程打包成Hello.jar文件,点击testHello.bat,可运行hello.jar。

    TeachingHelo:仓库以跟进Helo的课程

    教学英雄 用于跟进Helo的课程的存储库。

    Clever Internet .NET Suite 6.0.26.0

    SMTP Client - HELO / EHLO mode was improved. MailMessage - the messages with international texts were decoded incorrectly (the character set field was ignored) - fixed. MailMessage - ...

    Liker Land-crx插件

    语言:English Liker.land的浏览器扩展 用于将任何网页添加到您的Liker Land阅读列表的书签的浏览器扩展程序Liker Land是新一代的内容阅读器,其中...新功能:美化用户界面并添加多语言支持(感谢@ guanyun-helo的贡献)

    comp2121-assignment1:COMP2121作业1

    HELO 返回服务器认为是您的主机,以及服务器认为是它的主机。 MAIL FROM: 指定发送电子邮件的电子邮件地址。 RCPT TO: 指定要将电子邮件发送到的电子邮件地址。 DATA 开始电子邮件内容。 QUIT 断开与服务器的...

    helo-sim:此应用程序可让您创建帖子并查看其他用户的帖子

    该项目是通过。 可用脚本 在项目目录中,可以运行: npm start 在开发模式下运行应用... 此命令将从项目中删除单个构建依赖项。 而是将所有配置文件和传递依赖项(Webpack,Babel,ESLint等)直接复制到您的项目中,

    LIB_DC005-T20_worry5hx_dc005_proteus_

    HELO MI NAME IS PANCHO NIEC

Global site tag (gtag.js) - Google Analytics