标签存档: netty

Java WebSocket 开发 Webbit

webbit是基于netty扩展的websocket工具。可以大大简化websocket开发。

项目地址:https://github.com/webbit/webbit

使用说明:https://github.com/webbit/webbit/blob/master/README.md

本文权当翻译,高手直接进上面链接

一些题外话:

  1. websocket和我一开始想象的TCP应用不同。websocket和传统意义上的socket通信不一样。本质上还是HTTP的扩展。
  2. websocket协议目前还没有定稿。目前主要有3个版本的协议在使用。且都是草案。webbit都实现了3个草案。具体参阅维基http://en.wikipedia.org/wiki/WebSocket

快速开始

Maven配置

<dependency>
	<groupId>org.webbitserver</groupId>
	<artifactId>webbit</artifactId>
	<version>0.4.7</version>
</dependency>

 配置端口8080.并配置websocket路径/socket的handler

public class WebSocketServer{

	public static void main(String[] args) {
		WebServer webServer = WebServers.createWebServer(8080)
	    .add(new StaticFileHandler("/socket"));
		webServer.start();
	}

}

 编写/socket handler

public class WebSocketHandler  extends BaseWebSocketHandler{

    private int connectionCount;

    public void onOpen(WebSocketConnection connection) {
        connection.send("Hello! There are " + connectionCount + " other connections active");
        connectionCount++;
    }

    public void onClose(WebSocketConnection connection) {
        connectionCount--;
    }

    public void onMessage(WebSocketConnection connection, String message) {
        connection.send(message.toUpperCase()); // echo back message in upper case
    }

}

到此为止。基本代码已经都好了。感觉更写个servlet一样方便。

下面是客户端代码:

<html>
  <body>

    <!-- Send text to websocket -->
    <input id="userInput" type="text">
    <button onclick="ws.send(document.getElementById('userInput').value)">Send</button>

    <!-- Results -->
    <div id="message"></div>

    <script>
      function showMessage(text) {
        document.getElementById('message').innerHTML = text;
      }

      var ws = new WebSocket('ws://' + document.location.host + '/hellowebsocket');
      showMessage('Connecting...');
      ws.onopen = function() { showMessage('Connected!'); };
      ws.onclose = function() { showMessage('Lost connection'); };
      ws.onmessage = function(msg) { showMessage(msg.data); };
    </script>
  </body>
</html>

到此为止。websocket基本功能都已经实现。

 

 

netty+flash xmlsocket 在线聊天室

这几天公司做了个简单的web im聊天室。麦包包晒包频道 右下角的“包打听”。

采用netty 做socket server + flash client. 通讯采用自定义的JSON文本。

在线IM初步的探索,现阶段做的比较简单。

目前就部署1台服务器。没有考虑多机通信。没有涉及通信队列。

总共代码不到1000行。netty的确强大。

关键代码分享:

无话可说的代码,netty通用设置。

public class ChatRoomNioServer {

	private static final Logger		logger		= Logger.getLogger(ChatRoomNioServer.class);

	private static final Integer	SERVER_PORT	= Integer.parseInt(PropertiesHelp
													.getProperty("chat.server.port"));
	public void start() {
		logger.info("chatroom init...");
		ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
			Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
		bootstrap.setPipelineFactory(new ChatRoomServerPipelineFactory());

		bootstrap.setOption("child.tcpNoDelay", true);
		bootstrap.setOption("child.keepAlive", true);
		bootstrap.setOption("reuseAddress", true);
		bootstrap.bind(new InetSocketAddress(SERVER_PORT));
		logger.info("chatroom running...");
	}

}

由于采用JSON协议。

上行通道:

  1. DelimiterBasedFrameDecoder 读取缓存时,已\0\r\n 为中止符
  2. StringDecoder 二进制转字符串进行UTF-8解码
  3. StringEncoder 字符串UTF-8编码

下行通道:

  1. MessageHandler 业务逻辑处理
  2. MessageEncoder JSON转字节流打入下行通道
public class ChatRoomServerPipelineFactory implements ChannelPipelineFactory {

	@Override
	public ChannelPipeline getPipeline() throws Exception {
		ChannelPipeline pipeline = Channels.pipeline();
		pipeline
			.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.zeroDelimiter()));
		pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
		pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
		pipeline.addLast("messageHandler", new MessageHandler());
		pipeline.addLast("encoder", new MessageEncoder());
		return pipeline;
	}

}

netty没有提供\0截取。不过重写部分代码即可。

public class Delimiters {

	public static ChannelBuffer[] zeroDelimiter() {
		return new ChannelBuffer[] { ChannelBuffers.wrappedBuffer(new byte[] { '\0' }),
				ChannelBuffers.wrappedBuffer(new byte[] { '\r', '\n' }) };
	}

	private Delimiters() {

	}
}

逻辑处理:

  1. 处理flash 的policy file request
  2. 处理业务逻辑
public class MessageHandler extends SimpleChannelUpstreamHandler {

	private static final Logger	logger			= Logger.getLogger(MessageHandler.class);

	private static final String	POLICY_REQUEST	= "<policy-file-request/>";

	private static final String	POLICY_XML		= "<?xml version=\"1.0\"?><cross-domain-policy><site-control permitted-cross-domain-policies=\"master-only\"/><allow-access-from domain=\"*\" to-ports=\"*\" /></cross-domain-policy>";

	@Override
	public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		super.channelConnected(ctx, e);
		ChatService.initConnection(e.getChannel());
		logger.info("one user connection server, left " + ChatUserService.getOnlineUserCount()
					+ " users");
	}

	@Override
	public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		super.channelConnected(ctx, e);
		ChatService.closedConnection(e.getChannel());
		logger
			.info("one user left server, left " + ChatUserService.getOnlineUserCount() + " users");
	}

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		String msg = (String) e.getMessage();
		if (msg.equalsIgnoreCase(POLICY_REQUEST)) {
			e.getChannel().write(POLICY_XML + "\0");
			e.getChannel().close();
		} else {
			try {
				MessageDispatchHandler messageDispatchHandler = new MessageDispatchHandler(e);
				messageDispatchHandler.setListener(new ChatRoomListenerImpl());
				messageDispatchHandler.dispatch();
			} catch (Exception ex) {
				logger.error("message handler error", ex);
			}
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
		logger.warn("Unexpected exception from downstream.", e.getCause());
		e.getChannel().close();
	}
}

具体业务在    MessageDispatchHandler 处理一些业务逻辑即可。

public class MessageDispatchHandler {

	private static final Logger	logger	= Logger.getLogger(MessageDispatchHandler.class);

	private ChatData			chatData;

	private ChatConnection		connection;

	//事件监听器
	private ChatRoomListener	listener;

	public MessageDispatchHandler(ChatConnection connection, Object message) {
		this.chatData = ChatPackageUtil.unpackage(message);
		this.connection = connection;
	}

	public void dispatch() {
		logger.debug("current chatdata :" + chatData);
		if (listener == null) {
			throw new RuntimeException("监听器不能为空!");
		}
		try {
			switch (chatData.getMethod()) {
				case C_CONN:
					listener.connChatRoom(connection, chatData);
					break;
				case C_JOIN:
					listener.loginChatRoom(connection, chatData);
					break;
				case C_PUBLISH_MSG:
					listener.broadcastMsg(connection, chatData);
					break;
				case C_SEND_ADMIN:
					listener.sendAdminMsg(connection, chatData);
					break;
				case C_SEND_NORMAL:
					listener.sendNormalMsg(connection, chatData);
					break;
				case C_GET_USERS:
					listener.getOnlineUserList(connection, chatData);
					break;
				default:
					throw new BusinessException("错误指令:" + chatData.getMethod());
			}
		} catch (BusinessException e) {
			logger.info("error msg:" + e.getMessage());
			ChatService.pushErrorMsg(e.getCode(), e.getMessage(), connection);
		}
	}

	public void setListener(ChatRoomListener listener) {
		this.listener = listener;
	}

}

 

总结下遇到的问题:

  1. flash socket通讯格式。每个协议请求都会\0作为消息结尾。因此netty接收消息也必须以\0为截止符读取消息。
  2. flash 安全策略。flash夸域访问服务器时,会自动发<policy-file-request/> ,服务端收到消息后,必须响应对应策略文件。
    <cross-domain-policy>
      <allow-access-from domain="*" to-ports="80-9000" />
    </cross-domain-policy>
  3. SQL注入。Socket和Http相比。只是通讯的层次发生了改变。应用本身的漏洞仍旧没变。处理SQL的时候需要特别注意。不过用一些成熟的ORM框架可以很好的避免这类问题。
    注:这里使用的是比较低级的JSON字符串作为消息载体,较为普遍和靠谱的做法是采用head+body的方式,但前提需要对格式有严格的约定。JSON灵活性较大,因此采用。