netty+flash xmlsocket 在线聊天室

这几天公司做了个简单的web im聊天室。麦包包晒包频道 右下角的“包打听”。

采用netty 做socket server + flash client. 通讯采用自定义的JSON文本。

在线IM初步的探索,现阶段做的比较简单。

目前就部署1台服务器。没有考虑多机通信。没有涉及通信队列。

总共代码不到1000行。netty的确强大。

关键代码分享:

无话可说的代码,netty通用设置。

public class ChatRoomNioServer {

	private static final Logger		logger		= Logger.getLogger(ChatRoomNioServer.class);

	private static final Integer	SERVER_PORT	= Integer.parseInt(PropertiesHelp
													.getProperty("chat.server.port"));
	public void start() {
		logger.info("chatroom init...");
		ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
			Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
		bootstrap.setPipelineFactory(new ChatRoomServerPipelineFactory());

		bootstrap.setOption("child.tcpNoDelay", true);
		bootstrap.setOption("child.keepAlive", true);
		bootstrap.setOption("reuseAddress", true);
		bootstrap.bind(new InetSocketAddress(SERVER_PORT));
		logger.info("chatroom running...");
	}

}

由于采用JSON协议。

上行通道:

  1. DelimiterBasedFrameDecoder 读取缓存时,已\0\r\n 为中止符
  2. StringDecoder 二进制转字符串进行UTF-8解码
  3. StringEncoder 字符串UTF-8编码

下行通道:

  1. MessageHandler 业务逻辑处理
  2. MessageEncoder JSON转字节流打入下行通道
public class ChatRoomServerPipelineFactory implements ChannelPipelineFactory {

	@Override
	public ChannelPipeline getPipeline() throws Exception {
		ChannelPipeline pipeline = Channels.pipeline();
		pipeline
			.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.zeroDelimiter()));
		pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
		pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
		pipeline.addLast("messageHandler", new MessageHandler());
		pipeline.addLast("encoder", new MessageEncoder());
		return pipeline;
	}

}

netty没有提供\0截取。不过重写部分代码即可。

public class Delimiters {

	public static ChannelBuffer[] zeroDelimiter() {
		return new ChannelBuffer[] { ChannelBuffers.wrappedBuffer(new byte[] { '\0' }),
				ChannelBuffers.wrappedBuffer(new byte[] { '\r', '\n' }) };
	}

	private Delimiters() {

	}
}

逻辑处理:

  1. 处理flash 的policy file request
  2. 处理业务逻辑
public class MessageHandler extends SimpleChannelUpstreamHandler {

	private static final Logger	logger			= Logger.getLogger(MessageHandler.class);

	private static final String	POLICY_REQUEST	= "<policy-file-request/>";

	private static final String	POLICY_XML		= "<?xml version=\"1.0\"?><cross-domain-policy><site-control permitted-cross-domain-policies=\"master-only\"/><allow-access-from domain=\"*\" to-ports=\"*\" /></cross-domain-policy>";

	@Override
	public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		super.channelConnected(ctx, e);
		ChatService.initConnection(e.getChannel());
		logger.info("one user connection server, left " + ChatUserService.getOnlineUserCount()
					+ " users");
	}

	@Override
	public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		super.channelConnected(ctx, e);
		ChatService.closedConnection(e.getChannel());
		logger
			.info("one user left server, left " + ChatUserService.getOnlineUserCount() + " users");
	}

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		String msg = (String) e.getMessage();
		if (msg.equalsIgnoreCase(POLICY_REQUEST)) {
			e.getChannel().write(POLICY_XML + "\0");
			e.getChannel().close();
		} else {
			try {
				MessageDispatchHandler messageDispatchHandler = new MessageDispatchHandler(e);
				messageDispatchHandler.setListener(new ChatRoomListenerImpl());
				messageDispatchHandler.dispatch();
			} catch (Exception ex) {
				logger.error("message handler error", ex);
			}
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
		logger.warn("Unexpected exception from downstream.", e.getCause());
		e.getChannel().close();
	}
}

具体业务在    MessageDispatchHandler 处理一些业务逻辑即可。

public class MessageDispatchHandler {

	private static final Logger	logger	= Logger.getLogger(MessageDispatchHandler.class);

	private ChatData			chatData;

	private ChatConnection		connection;

	//事件监听器
	private ChatRoomListener	listener;

	public MessageDispatchHandler(ChatConnection connection, Object message) {
		this.chatData = ChatPackageUtil.unpackage(message);
		this.connection = connection;
	}

	public void dispatch() {
		logger.debug("current chatdata :" + chatData);
		if (listener == null) {
			throw new RuntimeException("监听器不能为空!");
		}
		try {
			switch (chatData.getMethod()) {
				case C_CONN:
					listener.connChatRoom(connection, chatData);
					break;
				case C_JOIN:
					listener.loginChatRoom(connection, chatData);
					break;
				case C_PUBLISH_MSG:
					listener.broadcastMsg(connection, chatData);
					break;
				case C_SEND_ADMIN:
					listener.sendAdminMsg(connection, chatData);
					break;
				case C_SEND_NORMAL:
					listener.sendNormalMsg(connection, chatData);
					break;
				case C_GET_USERS:
					listener.getOnlineUserList(connection, chatData);
					break;
				default:
					throw new BusinessException("错误指令:" + chatData.getMethod());
			}
		} catch (BusinessException e) {
			logger.info("error msg:" + e.getMessage());
			ChatService.pushErrorMsg(e.getCode(), e.getMessage(), connection);
		}
	}

	public void setListener(ChatRoomListener listener) {
		this.listener = listener;
	}

}

 

总结下遇到的问题:

  1. flash socket通讯格式。每个协议请求都会\0作为消息结尾。因此netty接收消息也必须以\0为截止符读取消息。
  2. flash 安全策略。flash夸域访问服务器时,会自动发<policy-file-request/> ,服务端收到消息后,必须响应对应策略文件。
    <cross-domain-policy>
      <allow-access-from domain="*" to-ports="80-9000" />
    </cross-domain-policy>
  3. SQL注入。Socket和Http相比。只是通讯的层次发生了改变。应用本身的漏洞仍旧没变。处理SQL的时候需要特别注意。不过用一些成熟的ORM框架可以很好的避免这类问题。
    注:这里使用的是比较低级的JSON字符串作为消息载体,较为普遍和靠谱的做法是采用head+body的方式,但前提需要对格式有严格的约定。JSON灵活性较大,因此采用。

2 条评论。

  1. 你好,Fred。今天刚看到你的网站。在浏览的时候看到netty 这一节。因为我最近也写了个用netty的项目。在看到
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {} 这个事件的时候。你做了这些处理
    MessageDispatchHandler messageDispatchHandler = new MessageDispatchHandler(e);
    messageDispatchHandler.setListener(new ChatRoomListenerImpl());
    messageDispatchHandler.dispatch();
    我查了下netty的api 没有MessageDispatchHandler这个类,很显然是你本人写的。我猜想你这个用于业务逻辑处理的类。加上了事件监听。我对你这个业务处理的类结构很感兴趣。不知道你能否公开下。或发邮件到hailuo610@qq.com 。因为我写这部分的时候感觉写的不是很好。最后向你表示感谢。并且你的小站做的很好。

    • 谢谢关注,已经公开。主要是把数据、连接、事件都分开处理。数据的不同选择不同的事件。比较简单。此外这里做了监听器完全是多余的,实际意义不大