• Post Reply Bookmark Topic Watch Topic
  • New Topic

non blocking socket  RSS feed

 
Rajeswaranr Ramachandran
Greenhorn
Posts: 1
  • Mark post as helpful
  • send pies
  • Quote
  • Report post to moderator
how to create a non blocking socket in java ,similar to select system call
 
sathish rontala
Greenhorn
Posts: 3
  • Mark post as helpful
  • send pies
  • Quote
  • Report post to moderator
hi
Check This code you will get it




package com.atic.streaming;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
import java.util.Set;

import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Category;


public class NonBlockingServer {

int port = 8001;
Selector selector = null;
ServerSocketChannel selectableChannel = null;
int keysAdded = 0;

static Category log =
Category.getInstance(NonBlockingServer.class.getName());

static String QUIT_SERVER = "quit";
static String SHUTDOWN = "shutdown";

public NonBlockingServer() {
}

public NonBlockingServer( int port ) {
this.port = port;
}

public void initialize()

throws IOException {
this.selector = SelectorProvider.provider().openSelector();
this.selectableChannel = ServerSocketChannel.open();
this.selectableChannel.configureBlocking(false);
InetAddress lh = InetAddress.getLocalHost();
InetSocketAddress isa = new InetSocketAddress(lh, this.port );
this.selectableChannel.socket().bind(isa);
}

public void finalize()

throws IOException {
this.selectableChannel.close();
this.selector.close();
}

public void acceptConnections()
throws IOException, InterruptedException {

SelectionKey acceptKey =
this.selectableChannel.register( this.selector,
SelectionKey.OP_ACCEPT );

log.debug( "Acceptor loop..." );
while (( this.keysAdded = acceptKey.selector().select()) > 0 ) {

log.debug( "Selector returned "
+ this.keysAdded + " ready for IO operations" );

Set readyKeys = this.selector.selectedKeys();
Iterator i = readyKeys.iterator();

while (i.hasNext()) {
SelectionKey key = (SelectionKey)i.next();
i.remove();

if ( key.isAcceptable() ) {
ServerSocketChannel nextReady =
(ServerSocketChannel)key.channel();

log.debug( "Processing selection key read="
+ key.isReadable() + " write=" + key.isWritable() +
" accept=" + key.isAcceptable() );

SocketChannel channel = nextReady.accept();
channel.configureBlocking( false );
SelectionKey readKey =
channel.register( this.selector,
SelectionKey.OP_READ|SelectionKey.OP_WRITE );
readKey.attach( new ChannelCallback( channel ) );
}

else if ( key.isReadable() ) {
SelectableChannel nextReady =
(SelectableChannel) key.channel();
log.debug( "Processing selection key read="
+ key.isReadable() + " write=" + key.isWritable() +
" accept=" + key.isAcceptable() );
this.readMessage( (ChannelCallback) key.attachment() );
}

else if ( key.isWritable() ) {
ChannelCallback callback = (ChannelCallback) key.attachment();
String message = "What is your name? ";
ByteBuffer buf = ByteBuffer.wrap( message.getBytes() );
int nbytes = callback.getChannel().write( buf );
}
}
}

log.debug( "End acceptor loop..." );

}



public void writeMessage( SocketChannel channel, String message )
throws IOException {
ByteBuffer buf = ByteBuffer.wrap( message.getBytes() );
int nbytes = channel.write( buf );
log.debug( "Wrote " + nbytes + " to channel." );
}



static final int BUFSIZE = 8;

public String decode( ByteBuffer byteBuffer )
throws CharacterCodingException {
Charset charset = Charset.forName( "us-ascii" );
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode( byteBuffer );
String result = charBuffer.toString();
return result;
}

public void readMessage( ChannelCallback callback )
throws IOException, InterruptedException {
ByteBuffer byteBuffer = ByteBuffer.allocate( BUFSIZE );
int nbytes = callback.getChannel().read( byteBuffer );
byteBuffer.flip();
String result = this.decode( byteBuffer );
log.debug( result );
if ( result.indexOf( "quit" ) >= 0 ) callback.getChannel().close();
else if ( result.indexOf( "shutdown" ) >= 0 ) {
callback.getChannel().close();
throw new InterruptedException();
}
else {
callback.append( result.toString() );
//If we are done with the line then we execute the callback.
if ( result.indexOf( "\n" ) >= 0 )
callback.execute();
}
}

public class ChannelCallback {
private SocketChannel channel;
private StringBuffer buffer;

public ChannelCallback( SocketChannel channel ) {
this.channel = channel;
this.buffer = new StringBuffer();
}

public void execute() throws IOException {
log.debug( this.buffer.toString() );
writeMessage( this.channel, this.buffer.toString() );
buffer = new StringBuffer();
}

public SocketChannel getChannel() {
return this.channel;
}

public void append( String values ) {
buffer.append( values );
}

}



public static void main( String[] args )
{
BasicConfigurator.configure();

NonBlockingServer nbServer = new NonBlockingServer();

try {
nbServer.initialize();
} catch ( IOException e ) {
e.printStackTrace();
System.exit( -1 );
}

try {
nbServer.acceptConnections();
}

catch ( IOException e ) {
e.printStackTrace();
log.error( e );
}

catch ( InterruptedException e ) {
log.info( "Exiting normally..." );
}

}

}
 
It is sorta covered in the JavaRanch Style Guide.
  • Post Reply Bookmark Topic Watch Topic
  • New Topic
Boost this thread!