(接上文)
四. 接着来讲讲数据的读和写:
当建立连接成功后,就会循环调用这么一个方法:
//读取http头部
- (void)_readHTTPHeader;
{
if (_receivedHTTPHeaders == NULL) {
//序列化的http消息
_receivedHTTPHeaders = CFHTTPMessageCreateEmpty(NULL, NO);
}
//不停的add consumer去读数据
[self _readUntilHeaderCompleteWithCallback:^(SRWebSocket *self, NSData *data) {
//拼接数据,拼到头部
CFHTTPMessageAppendBytes(_receivedHTTPHeaders, (const UInt8 *)data.bytes, data.length);
//判断是否接受完
if (CFHTTPMessageIsHeaderComplete(_receivedHTTPHeaders)) {
SRFastLog(@"Finished reading headers %@", CFBridgingRelease(CFHTTPMessageCopyAllHeaderFields(_receivedHTTPHeaders)));
[self _HTTPHeadersDidFinish];
} else {
//没读完递归调
[self _readHTTPHeader];
}
}];
}
记得楼主之前写过一篇即时通讯下数据粘包、断包处理实例(基于CocoaAsyncSocket),因此抛出一个问题,WebSocket需要处理数据的断包和粘包么?
答案是基本不需要。引用知乎上的一段回答:
RFC规范指出,WebSocket是一个message-based的协议,它可以自动将数据分片,并且自动将分片的数据组装。
也就是说,WebSocket的RFC标准是不会产生粘包、断包问题的。无需应用层开发人员关心缓存以及手工组装message。
然而理想与现实的不一致:RFC规范与实现的不一致,现实当中有几个问题:
每个message可以是一个或多个分片。message不记录长度,分片才记录长度。
message最大的长度可以达到 9,223,372,036,854,775,807 字节,是由于Payload的数据长度有63bit的限制。
很多WebSocket的实现其实并不按照标准的RFC实现完全,很多仅仅实现了50%就拿来用了。这就导致了,在WebSocket实现上的最大长度很难达到这个大小,于是,很多API的实现上是会有限制的,可能会限制你的发送的长度,也可能会把过长的数据直接以流式发送。
而SRWebSocket中实现的方式上彻底解决了数据粘包,断包的可能。
数据是通过CFStream流的方式回调回来的,每次拿到流数据,都是先放在数据缓冲区中,然后去读当前消息帧的头部,得到当前数据包的大小,然后再去创建消费者对象consumer,去读取缓冲区指定数据包大小的内容,读完才会回调给我们上层用户,所以,我们如果用SRWebSocket完全不需要考虑数据断包、粘包的问题,每次到达的数据,都是一条完整的数据。
接着我们大概来看看这个流程:
//读取CRLFCRLFBytes,直到回调回来
- (void)_readUntilHeaderCompleteWithCallback:(data_callback)dataHandler;
{
[self _readUntilBytes:CRLFCRLFBytes length:sizeof(CRLFCRLFBytes) callback:dataHandler];
}
//读取数据 CRLFCRLFBytes,边界符
- (void)_readUntilBytes:(const void *)bytes length:(size_t)length callback:(data_callback)dataHandler;
{
// TODO optimize so this can continue from where we last searched
//消费者需要消费的数据大小
stream_scanner consumer = ^size_t(NSData *data) {
__block size_t found_size = 0;
__block size_t match_count = 0;
//得到数据长度
size_t size = data.length;
//得到数据指针
const unsigned char *buffer = data.bytes;
for (size_t i = 0; i < size; i++ ) {
//匹配字符
if (((const unsigned char *)buffer)[i] == ((const unsigned char *)bytes)[match_count]) {
//匹配数+1
match_count += 1;
//如果匹配了
if (match_count == length) {
//读取数据长度等于 i+ 1
found_size = i + 1;
break;
}
} else {
match_count = 0;
}
}
//返回要读取数据的长度,没匹配成功就是0
return found_size;
};
[self _addConsumerWithScanner:consumer callback:dataHandler];
}
上面这个方法就是一个读取头部的方法,之前我写过断包粘包的文章就是用一个\r\n来分割头部和正文,这里是用了\r\n\r\n,每次读到这个标识符为止,就是读取了一个完整的WebSocket的消息帧头部。
这里我们先需要说清楚的是,数据一到达,就在stream的代理中回调中,写到了我们的_readBuffer缓冲区中去了:
case NSStreamEventHasBytesAvailable: {
SRFastLog(@"NSStreamEventHasBytesAvailable %@", aStream);
const int bufferSize = 2048;
uint8_t buffer[bufferSize];
//如果有可读字节
while (_inputStream.hasBytesAvailable) {
//读取数据,一次读2048
NSInteger bytes_read = [_inputStream read:buffer maxLength:bufferSize];
if (bytes_read > 0) {
//拼接数据
[_readBuffer appendBytes:buffer length:bytes_read];
} else if (bytes_read < 0) {
//读取错误
[self _failWithError:_inputStream.streamError];
}
//如果读取的不等于最大的,说明读完了,跳出循环
if (bytes_read != bufferSize) {
break;
}
};
//开始扫描,看消费者什么时候消费数据
[self _pumpScanner];
break;
}
接着我们来看添加消费者这个方法:
//指定数据读取
- (void)_addConsumerWithScanner:(stream_scanner)consumer callback:(data_callback)callback;
{
[self assertOnWorkQueue];
[self _addConsumerWithScanner:consumer callback:callback dataLength:0];
}
//添加消费者,用一个指定的长度,是否读到当前帧
- (void)_addConsumerWithDataLength:(size_t)dataLength callback:(data_callback)callback readToCurrentFrame:(BOOL)readToCurrentFrame unmaskBytes:(BOOL)unmaskBytes;
{
[self assertOnWorkQueue];
assert(dataLength);
//添加到消费者队列去
[_consumers addObject:[_consumerPool consumerWithScanner:nil handler:callback bytesNeeded:dataLength readToCurrentFrame:readToCurrentFrame unmaskBytes:unmaskBytes]];
[self _pumpScanner];
}
- (void)_addConsumerWithScanner:(stream_scanner)consumer callback:(data_callback)callback dataLength:(size_t)dataLength;
{
[self assertOnWorkQueue];
[_consumers addObject:[_consumerPool consumerWithScanner:consumer handler:callback bytesNeeded:dataLength readToCurrentFrame:NO unmaskBytes:NO]];
[self _pumpScanner];
}
其实就是添加了一个stream_scanner类型的对象,到我们的_consumers数组中去了,以后我们读取数据,都会先取出_consumers中的消费者,要读取多少,就给你从_readBuffer里去读多少数据。
//开始扫描
-(void)_pumpScanner;
{
[self assertOnWorkQueue];
//判断是否在扫描
if (!_isPumping) {
_isPumping = YES;
} else {
return;
}
//只有为NO能走到这里,开始循环检测,可读可写数据
while ([self _innerPumpScanner]) {
}
_isPumping = NO;
}
这个方法就是做这么一件事,根据consumer的要求,循环去_readBuffer中读取数据。
至于读的过程,大家可以自己去看下吧,楼主提供的源码注释里已经写的很清楚了,有点略长,这里就不放代码了,方法如下:
- (BOOL)_innerPumpScanner
{
...
}
至此我们讲了握手的头部信息的读取,与判断是否握手成功,然后数据到达是怎么从stream到_readBuffer中去的,并且简单介绍了_pumpScanner会根据消费者对象,去从_readBuffer中读取数据,读取完成并且回调consumer的handler
现在我们来讲讲一个数据从头部开始,到内容的读取过程:
每次我们读取新的一帧数据,都会调用这么个方法:
//读取新的消息帧
- (void)_readFrameNew;
{
dispatch_async(_workQueue, ^{
//清空上一帧的
[_currentFrameData setLength:0];
_currentFrameOpcode = 0;
_currentFrameCount = 0;
_readOpCount = 0;
_currentStringScanPosition = 0;
//继续读取
[self _readFrameContinue];
});
}
会清空上一帧的一些信息,然后开始当前帧的读取,我们来简单看看一个WebSocket消息帧里包含什么:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
就是这么一张图,大家应该经常见,这个图是RFC的标准规范。简单的说明下这些标识着什么:
FIN 1bit 表示信息的最后一帧,flag,也就是标记符
RSV 1-3 1bit each 以后备用的 默认都为 0
Opcode 4bit 帧类型,稍后细说
Mask 1bit 掩码,是否加密数据,默认必须置为1
Payload 7bit 数据的长度 (2^7 -1 最大到127)
Masking-key 1 or 4 bit 掩码 //用来编码数据
Payload data (x + y) bytes 数据 //
Extension data x bytes 扩展数据
Application data y bytes 程序数据
更详细的可以看看:WebSocket数据帧规范
接着我们读取消息,会用到其中的一些字段,包括FIN、 MASK、Payload len等等。
然后来看看这个读取当前消息帧的方法:
//开始读取当前消息帧
- (void)_readFrameContinue;
{
//断言要么都为空,要么都有值
assert((_currentFrameCount == 0 && _currentFrameOpcode == 0) || (_currentFrameCount > 0 && _currentFrameOpcode > 0));
//添加一个consumer,数据长度为2字节 frame_header 2个字节
[self _addConsumerWithDataLength:2 callback:^(SRWebSocket *self, NSData *data) {
//
__block frame_header header = {0};
const uint8_t *headerBuffer = data.bytes;
assert(data.length >= 2);
//判断第一帧 FIN
if (headerBuffer[0] & SRRsvMask) {
[self _closeWithProtocolError:@"Server used RSV bits"];
return;
}
//得到Qpcode
uint8_t receivedOpcode = (SROpCodeMask & headerBuffer[0]);
//判断帧类型,是否是指定的控制帧
BOOL isControlFrame = (receivedOpcode == SROpCodePing || receivedOpcode == SROpCodePong || receivedOpcode == SROpCodeConnectionClose);
//如果不是指定帧,而且receivedOpcode不等于0,而且_currentFrameCount消息帧大于0,错误关闭
if (!isControlFrame && receivedOpcode != 0 && self->_currentFrameCount > 0) {
[self _closeWithProtocolError:@"all data frames after the initial data frame must have opcode 0"];
return;
}
// 没消息
if (receivedOpcode == 0 && self->_currentFrameCount == 0) {
[self _closeWithProtocolError:@"cannot continue a message"];
return;
}
//正常读取
//得到opcode
header.opcode = receivedOpcode == 0 ? self->_currentFrameOpcode : receivedOpcode;
//得到fin
header.fin = !!(SRFinMask & headerBuffer[0]);
//得到Mask
header.masked = !!(SRMaskMask & headerBuffer[1]);
//得到数据长度
header.payload_length = SRPayloadLenMask & headerBuffer[1];
headerBuffer = NULL;
//如果是带掩码的,则报错,因为客户端是无法得知掩码的值得。
if (header.masked) {
[self _closeWithProtocolError:@"Client must receive unmasked data"];
}
size_t extra_bytes_needed = header.masked ? sizeof(_currentReadMaskKey) : 0;
//得到长度
if (header.payload_length == 126) {
extra_bytes_needed += sizeof(uint16_t);
} else if (header.payload_length == 127) {
extra_bytes_needed += sizeof(uint64_t);
}
//如果多余的需要的bytes为0
if (extra_bytes_needed == 0) {
//
[self _handleFrameHeader:header curData:self->_currentFrameData];
} else {
//读取payload
[self _addConsumerWithDataLength:extra_bytes_needed callback:^(SRWebSocket *self, NSData *data) {
size_t mapped_size = data.length;
#pragma unused (mapped_size)
const void *mapped_buffer = data.bytes;
size_t offset = 0;
if (header.payload_length == 126) {
assert(mapped_size >= sizeof(uint16_t));
uint16_t newLen = EndianU16_BtoN(*(uint16_t *)(mapped_buffer));
header.payload_length = newLen;
offset += sizeof(uint16_t);
} else if (header.payload_length == 127) {
assert(mapped_size >= sizeof(uint64_t));
header.payload_length = EndianU64_BtoN(*(uint64_t *)(mapped_buffer));
offset += sizeof(uint64_t);
} else {
assert(header.payload_length < 126 && header.payload_length >= 0);
}
if (header.masked) {
assert(mapped_size >= sizeof(_currentReadMaskOffset) + offset);
memcpy(self->_currentReadMaskKey, ((uint8_t *)mapped_buffer) + offset, sizeof(self->_currentReadMaskKey));
}
//把已读到的数据,和header传出去
[self _handleFrameHeader:header curData:self->_currentFrameData];
} readToCurrentFrame:NO unmaskBytes:NO];
}
} readToCurrentFrame:NO unmaskBytes:NO];
}
这个方法是先去读取了当前消息帧的前2个字节,大概就是这么一部分:
0 1
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6
+-+-+-+-+-------+-+-------------+
|F|R|R|R| opcode|M| Payload len |
|I|S|S|S| (4) |A| (7) |
|N|V|V|V| |S| |
| |1|2|3| |K| |
+-+-+-+-+-------+-+-------------+
然后会去对头部信息进行一些判断,但是最主要的还是去获取payload,也就是真实数据的长度,然后还是调用:
[self _addConsumerWithDataLength:extra_bytes_needed callback:^(SRWebSocket *self, NSData *data) {
...
}];
去读取真实数据的长度,然后会在下面这个方法中判断当前帧的数据是否读取完成:
- (void)_handleFrameHeader:(frame_header)frame_header curData:(NSData *)curData;
{
...
if(complete)
{
[self _handleFrameWithData:_currentFrameData opCode:frame_header.opcode];
}else{
[self _readFrameContinue];
}
...
}
如果没读取完成,会继续去读取,否则就调用完成的方法,在完成的方法中会回调暴露给我们的代理:
[self _performDelegateBlock:^{
[self.delegate webSocket:self didReceiveMessage:message];
}];
并且继续去读下一帧的数据
[self _readFrameNew];
整个数据读取过程就完成了。
接着我们来看看数据的写:
//写数据
- (void)_writeData:(NSData *)data;
{
//断言当前queue
[self assertOnWorkQueue];
//如果标记为写完成关闭,则直接返回
if (_closeWhenFinishedWriting) {
return;
}
//输出buffer拼接数据
[_outputBuffer appendData:data];
//开始写
[self _pumpWriting];
}
- (void)_pumpWriting
{
...
//写入进去,就会直接发送给对方了!这一步send
NSInteger bytesWritten = [_outputStream write:_outputBuffer.bytes + _outputBufferOffset maxLength:dataLength - _outputBufferOffset];
...
}
基本上非常简单,区别于之前CocoaAsyncSocket,读和写都没多少代码,原因是因为CocoaAsyncSocket整篇都用的是CFStream等相对上层的API。
SRWebSocket全篇代码注释地址:SRWebSocket注释。