专栏名称: HULK一线技术杂谈
HULK是360的私有云平台,丰富的一线实战经验,为你带来最有料的技术分享
目录
相关文章推荐
中国城市规划  ·  地方两会 | ... ·  2 天前  
中国城市规划  ·  学术报告 | ... ·  昨天  
中国交建  ·  孕育之冬丨中交集团中标这些项目⑭ ·  2 天前  
中国交建  ·  红心向党 ... ·  2 天前  
中国中铁  ·  佳节守护民生 润泽千家万户 ·  2 天前  
51好读  ›  专栏  ›  HULK一线技术杂谈

MongoDB 新功能介绍-Change Streams

HULK一线技术杂谈  · 公众号  ·  · 2018-07-31 19:00

正文

女主宣言

MongoDB 3.6已经GA有一段时间,网络上对于该版本新特性的详细介绍文章比较少为此借机会对部分新特性做一个相对详细的介绍。基于早期MongoDB版本实现如跨平台数据同步、消息通知、ETL及oplog备份等服务时大多依赖于 Tailable Cursors 的方式。当然这样的实现一来相对复杂同时也存在着一些风险(如不同版本oplog兼容性及过滤特定操作类型等)。


Change streams(暂且叫变更流)的出现不仅为业务提供了实时获取数据库数据变化的简易接口,同时又避免了原来使用tail oplog 的复杂和风险性。下面我们来看看如何来正确使用 Change stream 。

PS:丰富的一线技术、多元化的表现形式,尽在“ HULK一线技术杂谈 ”,点关注哦!

Change stream

使用条件限制

只用于 replica sets 和 sharded clusters ,单节点因为没有oplog故不支持。

复制协议必须是pv1 存储引擎必须是 WiredTiger


驱动实现接口

MongoDB Shell 接口说明

MongoDB 3.6 版本只实现了集合粒度的 change stream 具体方法如下:

db.collection.watch(pipeline, options)


该方法实际上是在集合collection上开启一个change stream的游标。

测试用例(mongo shell环境+副本集primary节点):

1

创建一个简单 Change Stream 游标并进行循环迭代

// 在test库的test集合上创建一个名为watchCursor 的change stream 游标

watchCursor = db.getSiblingDB("test").test.watch();

// 对游标watchCursor进行循环迭代(其中当游标关闭或游标迭代没有文档时isExhausted()返回true)

while (!watchCursor.isExhausted()){

if (watchCursor.hasNext()){

a=watchCursor.next();

printjson(a);

}

}

// 开启另一个会话在test库下的test集合执行update操作

db.test.update({x:100},{$set:{age:80}},{upsert:true});

输出结果及详细说明如下:

{

"_id" : {  // 表示更新操作的token 值(映射至对应操作的oplog)

"_data":BinData(0,"glsn32QAAAABRmRfaWQAZFsn3vA

7Q4yjQzA+1wBaEAQwkZh988FJS5yreqLRyy/wBA==")

},

"operationType" : "update", // 捕获的具体操作类型

// 输出更新后整个文档的详细信息

// 前提条件是在创建ChangeStream游标是指定了fullDocument : "updateLookup"

"fullDocument" : {

"_id" : ObjectId("5b27e2453b438ca343304236"),

"x" : 100,

"age" : 80,

"name" : "li"

},

"ns" : {

"db" : "test",// 对应的库名

"coll" : "test"// 对应的集合

},

"documentKey" : {

// 操作对应记录的_id,如果是分片集合此处还会输出对应的分片key

"_id" : ObjectId("5b27def03b438ca343303ed7")

},

"updateDescription" : { // 描述了操作后记录影响的具体增量信息

"updatedFields" : { // 增量操作(这里是update)所影响的字段

"age" : 80 // 增量操作(这里是更新后)具体字段的值

},

"removedFields" : [ ] //该字段描述了update操作后被删除的字段信息

}

}


2

创建一个只匹配 insert 操作类型的 Change Stream 游标

watchCursor=db.getSiblingDB("test").test.watch(

[

{ $match : {"operationType" : "insert" } }// 只匹配insert 操作的变更

]

);

游标创建后通过对游标进行迭代,只能获取test集合上insert操作类型的信息。其他支持的操作类型update、delete、replaceOne 及输出信息详细说明可参见:Change Events

https://docs.mongodb.com/manual/reference/change-events/


3

ChangeStream 的”断线恢复”功能

ChangeStream还支持”断线恢复”功能即当游标因为意外情况关闭后可以通过之前的token信息进行恢复(前提条件是token对应的oplog没有被覆盖),具体使用如下:

var resumeToken={

// 该token 信息可是是之前任意有效操作的输出

"_data" :   BinData(0,"glsn32QAAAABRmRfaWQAZFsn3vA7Q

4yjQzA+1wBaEAQwkZh988FJS5yreqLRyy/wBA==")};

var resumedWatchCursor=db.getSiblingDB("test").test.watch(

[],

{ resumeAfter : resumeToken } // 指定对应的token之后开始恢复游标

);


其他的使用场景,读者自行测试即可。



注意事项

1.尝试在单节点(非副本集节点)上创建ChangeStream游标会报如下错误:

command failed: {

"ok" : 0,

"errmsg" : "The $changeStream stage is only supported on replica sets",

"code" : 40573,

"codeName" : "Location40573"

}


2. ChangeStream 只发布持久化到大多数(majority-committed)节点的数据变化通知


3.要想在集合上创建ChangeStream游标用户必须对集合具有读权限


4.对于分片集合带有multi:true 的更新操作可能会导致发布孤立文档的变更消息


5.对于如创建索引的操作游标迭代时直接忽略该操作但是如果 dropDatabase 或对集合进行 rename、drop 操作则会触发游标退出并输出如下信息:

{

"_id" : {

"_data":BinData(0,"glsn6TQAAAABFFoQBI7fTw7hk0LHgHqi0QTIvq0E")

},

"operationType" : "invalidate" // 表示无效或叫非法操作

}


6. 当 ChangeStream 游标因特定操作导致退出后,Mongo Shell 下不会自动恢复,而对于3.6版本系列的各语言驱动则会尝试一次自动恢复。


7. 当对应的 token 信息对应的 oplog 不存在然后尝试恢复ChangeStream 游标时不会报错但尝试对集合进行数据操作后会报如下错:

getMore command failed:{

"operationTime" : Timestamp(1528994552, 1),

"ok" : 0,

"errmsg" : "resume of change stream was not possible, as the resume token was not found

….

}








请到「今天看啥」查看全文