点击关注上方“程序员私房菜”,设为“置顶或星标”,第一时间送达实用技术干货。
文章已获得作者授权,原文地址:
xxgblog.com/2014/08/27/mina-netty-twisted-5
protobuf 是谷歌的 Protocol Buffers 的简称,用于结构化数据和字节码之间互相转换(序列化、反序列化),一般应用于网络传输,可支持多种编程语言。
protobuf 如何使用这里不再介绍,本文主要介绍在 MINA、Netty、Twisted 中如何使用 protobuf。
在前一篇文章(Netty、MINA、Twisted一起学系列04:定制自己的协议)中,有介绍到一种用一个固定为4字节的前缀Header来指定Body的字节数的一种消息分割方式,在这里同样要使用到。只是其中 Body 的内容不再是字符串,而是 protobuf 字节码。
在处理业务逻辑时,肯定不希望还要对数据进行序列化和反序列化,而是希望直接操作一个对象,那么就需要有相应的编码器和解码器,将序列化和反序列化的逻辑写在编码器和解码器中。有关编码器和解码器的实现,上一篇文章中有介绍。
Netty 包中已经自带针对 protobuf 的编码器和解码器,那么就不用再自己去实现了。而 MINA、Twisted 还需要自己去实现 protobuf 的编码器和解码器。
这里定义一个protobuf数据结构,用于描述一个学生的信息,保存为StudentMsg.proto文件:
message Student {
required int32 id = 1;
required string name = 2;
optional string email = 3;
repeated string friends = 4;
}
用 StudentMsg.proto 分别生成 Java 和 Python 代码,将代码加入到相应的项目中。生成的代码就不再贴上来了。下面分别介绍在 Netty、MINA、Twisted 如何使用 protobuf 来传输 Student 信息。
1 Netty
Netty 自带 protobuf 的编码器和解码器,分别是 ProtobufEncoder 和 ProtobufDecoder。需要注意的是,ProtobufEncoder 和 ProtobufDecoder 只负责 protobuf 的序列化和反序列化,而处理消息 Header 前缀和消息分割的还需要 LengthFieldBasedFrameDecoder 和 LengthFieldPrepender。LengthFieldBasedFrameDecoder 即用于解析消息 Header 前缀,根据 Header 中指定的 Body 字节数截取 Body,LengthFieldPrepender 用于在wirte消息时在消息前面添加一个 Header 前缀来指定 Body 字节数。
public class TcpServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
pipeline.addLast("protobufDecoder",
new ProtobufDecoder(StudentMsg.Student.getDefaultInstance()));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("protobufEncoder", new ProtobufEncoder());
pipeline.addLast(new TcpServerHandler());
}
});
ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
处理事件时,接收和发送的参数直接就是Student对象。
public class TcpServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
StudentMsg.Student student = (StudentMsg.Student) msg;
System.out.println("ID:" + student.getId());
System.out.println("Name:" + student.getName());
System.out.println("Email:" + student.getEmail());
System.out.println("Friends:");
List friends = student.getFriendsList();
for(String friend : friends) {
System.out.println(friend);
}
StudentMsg.Student.Builder builder = StudentMsg.Student.newBuilder();
builder.setId(9);
builder.setName("服务器");
builder.setEmail("[email protected]");
builder.addFriends("X");
builder.addFriends("Y");
StudentMsg.Student student2 = builder.build();
ctx.writeAndFlush(student2);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
2. MINA
在 MINA 中没有针对 protobuf 的编码器和解码器,但是可以自己实现一个功能和 Netty 一样的编码器和解码器。
编码器:
public class MinaProtobufEncoder extends ProtocolEncoderAdapter {
@Override
public void encode(IoSession session, Object message,
ProtocolEncoderOutput out) throws Exception {
StudentMsg.Student student = (StudentMsg.Student) message;
byte[] bytes = student.toByteArray();
int length = bytes.length;
IoBuffer buffer = IoBuffer.allocate(length + 4);
buffer.putInt(length);
buffer.put(bytes);
buffer.flip();
out.write(buffer);
}
}
解码器:
public class MinaProtobufDecoder extends CumulativeProtocolDecoder {
@Override
protected boolean doDecode(IoSession session, IoBuffer in,
ProtocolDecoderOutput out) throws Exception {
if (in.remaining() 4) {
return false;
} else {
in.mark();
int bodyLength = in.getInt();
if (in.remaining() in.reset();
return false;
} else {
byte[] bodyBytes = new byte[bodyLength];
in.get(bodyBytes);
StudentMsg.Student student = StudentMsg.Student.parseFrom(bodyBytes);
out.write(student);
return true;
}
}
}
}
MINA 服务器加入 protobuf 的编码器和解码器:
public class TcpServer {
public static void main(String[] args) throws IOException {
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new MinaProtobufEncoder(), new MinaProtobufDecoder()));
acceptor.setHandler(new TcpServerHandle());
acceptor.bind(new InetSocketAddress(8080));
}
}
这样,在处理业务逻辑时,就和Netty一样了:
public class TcpServerHandle extends IoHandlerAdapter {
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
cause.printStackTrace();
}
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
StudentMsg.Student student = (StudentMsg.Student) message;
System.out.println("ID:" + student.getId());
System.out.println("Name:" + student.getName());
System.out.println("Email:" + student.getEmail());
System.out.println("Friends:");
List friends = student.getFriendsList();
for(String friend : friends) {
System.out.println(friend);
}
StudentMsg.Student.Builder builder = StudentMsg.Student.newBuilder();
builder.setId(9);
builder.setName("服务器");
builder.setEmail("[email protected]");
builder.addFriends("X");
builder.addFriends("Y");
StudentMsg.Student student2 = builder.build();
session.write(student2);
}
}
3. Twisted
在 Twisted 中,首先定义一个 ProtobufProtocol 类,继承 Protocol 类,充当编码器和解码器。处理业务逻辑的 TcpServerHandle 类再继承 ProtobufProtocol 类,调用或重写 ProtobufProtocol 提供的方法。
from struct import pack, unpack
from twisted.internet.protocol import Factory
from twisted.internet.protocol import Protocol
from twisted.internet import reactor
import StudentMsg_pb2
class ProtobufProtocol(Protocol):
_buffer = b""
def dataReceived(self, data):
self._buffer = self._buffer + data
while True:
if len(self._buffer) >= 4:
length, = unpack(">I", self._buffer[0:4])
if len(self._buffer) >= 4 + length:
packet = self._buffer[4:4 + length]
student = StudentMsg_pb2.Student()
student.ParseFromString(packet)
self.protobufReceived(student)
self._buffer = self._buffer[4 + length:]
else:
break;
else:
break;
def protobufReceived(self, student):
raise NotImplementedError
def sendProtobuf(self, student):
data = student.SerializeToString()
self.transport.write(pack(">I", len(data)) + data)
class TcpServerHandle(ProtobufProtocol):
def protobufReceived(self, student):
print 'ID:' + str(student.id)
print 'Name:' + student.name
print 'Email:' + student.email
print 'Friends:'
for friend in student.friends:
print friend
student2 = StudentMsg_pb2.Student()
student2.id = 9
student2.name = '服务器'.decode('UTF-8')
student2.email = '[email protected]'
student2.friends.append('X')
student2.friends.append('Y')
self.sendProtobuf(student2)
factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()
下面是Java编写的一个客户端测试程序:
public class TcpClient {
public static void main(String[] args) throws IOException {
Socket socket = null;
DataOutputStream out = null;
DataInputStream in = null;
try {
socket = new Socket("localhost", 8080);
out = new DataOutputStream(socket.getOutputStream());
in = new DataInputStream(socket.getInputStream());
StudentMsg.Student.Builder builder = StudentMsg.Student.newBuilder();
builder.setId(1);
builder.setName("客户端");
builder.setEmail("[email protected]");
builder.addFriends("A");
builder.addFriends("B");
StudentMsg.Student student = builder.build();
byte[] outputBytes = student.toByteArray();
out.writeInt(outputBytes.length);
out.write(outputBytes);
out.flush();
int bodyLength = in.readInt();
byte[] bodyBytes = new byte[bodyLength];
in.readFully(bodyBytes);
StudentMsg.Student student2 = StudentMsg.Student.parseFrom(bodyBytes);
System.out.println("Header:" + bodyLength);
System.out.println("Body:");
System.out.println("ID:" + student2.getId());
System.out.println("Name:" + student2.getName());
System.out.println("Email:" + student2.getEmail());
System.out.println("Friends:");
List friends = student2.getFriendsList();
for(String friend : friends) {
System.out.println(friend);
}
} finally {
in.close();
out.close();
socket.close();
}
}
}
用客户端分别测试上面三个TCP服务器:
服务器输出:
关注我
每天进步一点点