这几天公司做了个简单的web im聊天室。麦包包晒包频道 右下角的“包打听”。
采用netty 做socket server + flash client. 通讯采用自定义的JSON文本。
在线IM初步的探索,现阶段做的比较简单。
目前就部署1台服务器。没有考虑多机通信。没有涉及通信队列。
总共代码不到1000行。netty的确强大。
关键代码分享:
无话可说的代码,netty通用设置。
public class ChatRoomNioServer { private static final Logger logger = Logger.getLogger(ChatRoomNioServer.class); private static final Integer SERVER_PORT = Integer.parseInt(PropertiesHelp .getProperty("chat.server.port")); public void start() { logger.info("chatroom init..."); ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); bootstrap.setPipelineFactory(new ChatRoomServerPipelineFactory()); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); bootstrap.setOption("reuseAddress", true); bootstrap.bind(new InetSocketAddress(SERVER_PORT)); logger.info("chatroom running..."); } }
由于采用JSON协议。
上行通道:
- DelimiterBasedFrameDecoder 读取缓存时,已\0\r\n 为中止符
- StringDecoder 二进制转字符串进行UTF-8解码
- StringEncoder 字符串UTF-8编码
下行通道:
- MessageHandler 业务逻辑处理
- MessageEncoder JSON转字节流打入下行通道
public class ChatRoomServerPipelineFactory implements ChannelPipelineFactory { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline .addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.zeroDelimiter())); pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast("messageHandler", new MessageHandler()); pipeline.addLast("encoder", new MessageEncoder()); return pipeline; } }
netty没有提供\0截取。不过重写部分代码即可。
public class Delimiters { public static ChannelBuffer[] zeroDelimiter() { return new ChannelBuffer[] { ChannelBuffers.wrappedBuffer(new byte[] { '\0' }), ChannelBuffers.wrappedBuffer(new byte[] { '\r', '\n' }) }; } private Delimiters() { } }
逻辑处理:
- 处理flash 的policy file request
- 处理业务逻辑
public class MessageHandler extends SimpleChannelUpstreamHandler { private static final Logger logger = Logger.getLogger(MessageHandler.class); private static final String POLICY_REQUEST = "<policy-file-request/>"; private static final String POLICY_XML = "<?xml version=\"1.0\"?><cross-domain-policy><site-control permitted-cross-domain-policies=\"master-only\"/><allow-access-from domain=\"*\" to-ports=\"*\" /></cross-domain-policy>"; @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { super.channelConnected(ctx, e); ChatService.initConnection(e.getChannel()); logger.info("one user connection server, left " + ChatUserService.getOnlineUserCount() + " users"); } @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { super.channelConnected(ctx, e); ChatService.closedConnection(e.getChannel()); logger .info("one user left server, left " + ChatUserService.getOnlineUserCount() + " users"); } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { String msg = (String) e.getMessage(); if (msg.equalsIgnoreCase(POLICY_REQUEST)) { e.getChannel().write(POLICY_XML + "\0"); e.getChannel().close(); } else { try { MessageDispatchHandler messageDispatchHandler = new MessageDispatchHandler(e); messageDispatchHandler.setListener(new ChatRoomListenerImpl()); messageDispatchHandler.dispatch(); } catch (Exception ex) { logger.error("message handler error", ex); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { logger.warn("Unexpected exception from downstream.", e.getCause()); e.getChannel().close(); } }
具体业务在 MessageDispatchHandler 处理一些业务逻辑即可。
public class MessageDispatchHandler { private static final Logger logger = Logger.getLogger(MessageDispatchHandler.class); private ChatData chatData; private ChatConnection connection; //事件监听器 private ChatRoomListener listener; public MessageDispatchHandler(ChatConnection connection, Object message) { this.chatData = ChatPackageUtil.unpackage(message); this.connection = connection; } public void dispatch() { logger.debug("current chatdata :" + chatData); if (listener == null) { throw new RuntimeException("监听器不能为空!"); } try { switch (chatData.getMethod()) { case C_CONN: listener.connChatRoom(connection, chatData); break; case C_JOIN: listener.loginChatRoom(connection, chatData); break; case C_PUBLISH_MSG: listener.broadcastMsg(connection, chatData); break; case C_SEND_ADMIN: listener.sendAdminMsg(connection, chatData); break; case C_SEND_NORMAL: listener.sendNormalMsg(connection, chatData); break; case C_GET_USERS: listener.getOnlineUserList(connection, chatData); break; default: throw new BusinessException("错误指令:" + chatData.getMethod()); } } catch (BusinessException e) { logger.info("error msg:" + e.getMessage()); ChatService.pushErrorMsg(e.getCode(), e.getMessage(), connection); } } public void setListener(ChatRoomListener listener) { this.listener = listener; } }
总结下遇到的问题:
- flash socket通讯格式。每个协议请求都会\0作为消息结尾。因此netty接收消息也必须以\0为截止符读取消息。
- flash 安全策略。flash夸域访问服务器时,会自动发<policy-file-request/> ,服务端收到消息后,必须响应对应策略文件。
<cross-domain-policy> <allow-access-from domain="*" to-ports="80-9000" /> </cross-domain-policy>
- SQL注入。Socket和Http相比。只是通讯的层次发生了改变。应用本身的漏洞仍旧没变。处理SQL的时候需要特别注意。不过用一些成熟的ORM框架可以很好的避免这类问题。
注:这里使用的是比较低级的JSON字符串作为消息载体,较为普遍和靠谱的做法是采用head+body的方式,但前提需要对格式有严格的约定。JSON灵活性较大,因此采用。
你好,Fred。今天刚看到你的网站。在浏览的时候看到netty 这一节。因为我最近也写了个用netty的项目。在看到
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {} 这个事件的时候。你做了这些处理
MessageDispatchHandler messageDispatchHandler = new MessageDispatchHandler(e);
messageDispatchHandler.setListener(new ChatRoomListenerImpl());
messageDispatchHandler.dispatch();
我查了下netty的api 没有MessageDispatchHandler这个类,很显然是你本人写的。我猜想你这个用于业务逻辑处理的类。加上了事件监听。我对你这个业务处理的类结构很感兴趣。不知道你能否公开下。或发邮件到hailuo610@qq.com 。因为我写这部分的时候感觉写的不是很好。最后向你表示感谢。并且你的小站做的很好。
谢谢关注,已经公开。主要是把数据、连接、事件都分开处理。数据的不同选择不同的事件。比较简单。此外这里做了监听器完全是多余的,实际意义不大