微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Java网络:即兴的Socket / InputStream

我正在 Java套接字上实现一个面向事件的层,我想知道是否有办法确定是否有待读取的数据.

我的常规方法是从套接字读入缓冲区,并在缓冲区填充给定量的字节时调用提供的回调(如果每次到达时都需要触发回调,则可以为0),但是我怀疑Java已经在为我做缓冲了.

InputStream的available()方法是否可靠?我应该只读()并在Socket上做自己的缓冲吗?或者还有另一种方式吗?

解决方法

不久,没有. available()不可靠(至少不适合我).我建议使用与Selector和SelectionKey连接的java.nio.channels.socketChannel.这个解决方案有点基于事件,但比普通的套接字更复杂.

对于客户:

>构造套接字通道(套接字),打开一个选择器(selector = Selector.open();).
>使用非阻塞socket.configureBlocking(false);
>为socket连接注册选择器socket.register(selector,SelectionKey.OP_CONNECT);
>连接socket.connect(新的InetSocketAddress(主机,端口));
>看看是否有新的selector.select();
>如果“new”表示连接成功,则注册选择器OP_READ;如果“new”指的是可用的数据,只需从套接字读取即可.

但是,为了使它具有异步性,您需要设置一个单独的线程(尽管套接字被创建为非阻塞,但线程仍会阻塞),它会检查是否已经到达某些东西.

对于服务器,有ServerSocketChannel,您可以使用OP_ACCEPT.

作为参考,这是我的代码(客户端),应该给你一个提示

private Thread readingThread = new ListeningThread();

 /**
  * Listening thread - reads messages in a separate thread so the application does not get blocked.
  */
 private class ListeningThread extends Thread {
  public void run() {
   running = true;
   try {
    while(!close) listen();
    messenger.close();
   }
   catch(ConnectException ce) {
    doNotifyConnectionFailed(ce);
   }
   catch(Exception e) {
//    e.printstacktrace();
    messenger.close();
   }
   running = false;
  }
 }

 /**
  * Connects to host and port.
  * @param host Host to connect to.
  * @param port Port of the host machine to connect to.
  */
 public void connect(String host,int port) {
  try {
   SocketChannel socket = SocketChannel.open();
   socket.configureBlocking(false);
   socket.register(this.selector,SelectionKey.OP_CONNECT);
   socket.connect(new InetSocketAddress(host,port));
  }
  catch(IOException e) {
   this.doNotifyConnectionFailed(e);
  }
 }

 /**
  * Waits for an event to happen,processes it and then returns.
  * @throws IOException when something goes wrong.
  */
 protected void listen() throws IOException {
  // see if there are any new things going on
  this.selector.select();
  // process events
  Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  while(iter.hasNext()) {
   SelectionKey key = iter.next();
   iter.remove();
   // check validity
   if(key.isValid()) {
    // if connectable...
    if(key.isConnectable()) {
     // ...establish connection,make messenger,and notify everyone
     SocketChannel client = (SocketChannel)key.channel();
     // Now this is tricky,registering for OP_READ earlier causes the selector not to wait for incoming bytes,which results in 100% cpu usage very,very fast
     if(client!=null && client.finishConnect()) {
      client.register(this.selector,SelectionKey.OP_READ);
     }
    }
    // if readable,tell messenger to read bytes
    else if(key.isReadable() && (SocketChannel)key.channel()==this.messenger.getSocket()) {
     // read message here
    }
   }
  }
 }

 /**
  * Starts the client.
  */
 public void start() {
  // start a reading thread
  if(!this.running) {
   this.readingThread = new ListeningThread();
   this.readingThread.start();
  }
 }

 /**
  * Tells the client to close at nearest possible moment.
  */
 public void close() {
  this.close = true;
 }

对于服务器:

/**
  * Constructs a server.
  * @param port Port to listen to.
  * @param protocol Protocol of messages.
  * @throws IOException when something goes wrong.
  */
 public ChannelMessageServer(int port) throws IOException {
  this.server = ServerSocketChannel.open();
  this.server.configureBlocking(false);
  this.server.socket().bind(new InetSocketAddress(port));
  this.server.register(this.selector,SelectionKey.OP_ACCEPT);
 }

 /**
  * Waits for event,then exits.
  * @throws IOException when something goes wrong.
  */
 protected void listen() throws IOException {
  // see if there are any new things going on
  this.selector.select();
  // process events
  Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  while(iter.hasNext()) {
   SelectionKey key = iter.next();
   // do something with the connected socket
   iter.remove();
   if(key.isValid()) this.process(key);
  }
 }

 /**
  * Processes a selection key.
  * @param key SelectionKey.
  * @throws IOException when something is wrong.
  */
 protected void process(SelectionKey key) throws IOException {
  // if incoming connection
  if(key.isAcceptable()) {
   // get client
   SocketChannel client = (((ServerSocketChannel)key.channel()).accept());
    try {
     client.configureBlocking(false);
     client.register(this.selector,SelectionKey.OP_READ);
    }
    catch(Exception e) {
     // catch
    }
  }
  // if readable,tell messenger to read
  else if(key.isReadable()) {
  // read
  }
 }

希望这可以帮助.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐