这几天公司做了个简单的web im聊天室。麦包包晒包频道 右下角的“包打听”。
采用netty 做socket server + flash client. 通讯采用自定义的JSON文本。
在线IM初步的探索,现阶段做的比较简单。
目前就部署1台服务器。没有考虑多机通信。没有涉及通信队列。
总共代码不到1000行。netty的确强大。
关键代码分享:
无话可说的代码,netty通用设置。
public class ChatRoomNioServer {
private static final Logger logger = Logger.getLogger(ChatRoomNioServer.class);
private static final Integer SERVER_PORT = Integer.parseInt(PropertiesHelp
.getProperty("chat.server.port"));
public void start() {
logger.info("chatroom init...");
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new ChatRoomServerPipelineFactory());
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
bootstrap.setOption("reuseAddress", true);
bootstrap.bind(new InetSocketAddress(SERVER_PORT));
logger.info("chatroom running...");
}
}
由于采用JSON协议。
上行通道:
- DelimiterBasedFrameDecoder 读取缓存时,已\0\r\n 为中止符
- StringDecoder 二进制转字符串进行UTF-8解码
- StringEncoder 字符串UTF-8编码
下行通道:
- MessageHandler 业务逻辑处理
- MessageEncoder JSON转字节流打入下行通道
public class ChatRoomServerPipelineFactory implements ChannelPipelineFactory {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline
.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.zeroDelimiter()));
pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("messageHandler", new MessageHandler());
pipeline.addLast("encoder", new MessageEncoder());
return pipeline;
}
}
netty没有提供\0截取。不过重写部分代码即可。
public class Delimiters {
public static ChannelBuffer[] zeroDelimiter() {
return new ChannelBuffer[] { ChannelBuffers.wrappedBuffer(new byte[] { '\0' }),
ChannelBuffers.wrappedBuffer(new byte[] { '\r', '\n' }) };
}
private Delimiters() {
}
}
逻辑处理:
- 处理flash 的policy file request
- 处理业务逻辑
public class MessageHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(MessageHandler.class);
private static final String POLICY_REQUEST = "<policy-file-request/>";
private static final String POLICY_XML = "<?xml version=\"1.0\"?><cross-domain-policy><site-control permitted-cross-domain-policies=\"master-only\"/><allow-access-from domain=\"*\" to-ports=\"*\" /></cross-domain-policy>";
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelConnected(ctx, e);
ChatService.initConnection(e.getChannel());
logger.info("one user connection server, left " + ChatUserService.getOnlineUserCount()
+ " users");
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelConnected(ctx, e);
ChatService.closedConnection(e.getChannel());
logger
.info("one user left server, left " + ChatUserService.getOnlineUserCount() + " users");
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
String msg = (String) e.getMessage();
if (msg.equalsIgnoreCase(POLICY_REQUEST)) {
e.getChannel().write(POLICY_XML + "\0");
e.getChannel().close();
} else {
try {
MessageDispatchHandler messageDispatchHandler = new MessageDispatchHandler(e);
messageDispatchHandler.setListener(new ChatRoomListenerImpl());
messageDispatchHandler.dispatch();
} catch (Exception ex) {
logger.error("message handler error", ex);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
logger.warn("Unexpected exception from downstream.", e.getCause());
e.getChannel().close();
}
}
具体业务在 MessageDispatchHandler 处理一些业务逻辑即可。
public class MessageDispatchHandler {
private static final Logger logger = Logger.getLogger(MessageDispatchHandler.class);
private ChatData chatData;
private ChatConnection connection;
//事件监听器
private ChatRoomListener listener;
public MessageDispatchHandler(ChatConnection connection, Object message) {
this.chatData = ChatPackageUtil.unpackage(message);
this.connection = connection;
}
public void dispatch() {
logger.debug("current chatdata :" + chatData);
if (listener == null) {
throw new RuntimeException("监听器不能为空!");
}
try {
switch (chatData.getMethod()) {
case C_CONN:
listener.connChatRoom(connection, chatData);
break;
case C_JOIN:
listener.loginChatRoom(connection, chatData);
break;
case C_PUBLISH_MSG:
listener.broadcastMsg(connection, chatData);
break;
case C_SEND_ADMIN:
listener.sendAdminMsg(connection, chatData);
break;
case C_SEND_NORMAL:
listener.sendNormalMsg(connection, chatData);
break;
case C_GET_USERS:
listener.getOnlineUserList(connection, chatData);
break;
default:
throw new BusinessException("错误指令:" + chatData.getMethod());
}
} catch (BusinessException e) {
logger.info("error msg:" + e.getMessage());
ChatService.pushErrorMsg(e.getCode(), e.getMessage(), connection);
}
}
public void setListener(ChatRoomListener listener) {
this.listener = listener;
}
}
总结下遇到的问题:
- flash socket通讯格式。每个协议请求都会\0作为消息结尾。因此netty接收消息也必须以\0为截止符读取消息。
- flash 安全策略。flash夸域访问服务器时,会自动发<policy-file-request/> ,服务端收到消息后,必须响应对应策略文件。
<cross-domain-policy>
<allow-access-from domain="*" to-ports="80-9000" />
</cross-domain-policy>
- SQL注入。Socket和Http相比。只是通讯的层次发生了改变。应用本身的漏洞仍旧没变。处理SQL的时候需要特别注意。不过用一些成熟的ORM框架可以很好的避免这类问题。
注:这里使用的是比较低级的JSON字符串作为消息载体,较为普遍和靠谱的做法是采用head+body的方式,但前提需要对格式有严格的约定。JSON灵活性较大,因此采用。
近期评论