在上一篇文章中,我们分析了保存文件/内容的整体流程,基本上知道在这个过程中文件/内容是怎么处理的。
但是,还留下了一个疑问,就是文件是怎么分片的,又是怎么保存到本地系统。
这篇文章我们就来解决这几个问题。
通过上一篇文章,我们知道
ipfs-unixfs-importer
这个类库,它实现了 IPFS 用于处理文件的布局和分块机制。
它的
index.js
文件内容只有一行代码
require('./importer')
,接下来我们直接来看这个
importer/index.js
是怎么处理的。
把参数传递的选项和默认选项进行合并,生成新的选项,然后检查选项的相关配置。
const options = Object.assign({}, defaultOptions, _options)
options.cidVersion = options.cidVersion || 0
if (options.cidVersion > 0 && _options.rawLeaves === undefined) {
options.rawLeaves = true
}
if (_options && _options.hash !== undefined && _options.rawLeaves === undefined) {
options.rawLeaves = true
}
默认选项即
defaultOptions
内容如下:
const defaultOptions = {
chunker: 'fixed',
rawLeaves: false,
hashOnly: false,
cidVersion: 0,
hash: null,
leafType: 'file',
hashAlg: 'sha2-256'
}
根据选项中指定的分割方式,从 IPFS 中提供的所有分割方法找到对应的分割对象。
const Chunker = chunkers[options.chunker]
chunkers
表示系统提供的所有分割方法对象。
在父目录下
chunker/index.js
文件中定义的,默认有
fixed
、
rabin
两种方法,默认使用的是的前者,即固定大小。
生成一个 pull-through 的双向流,双向流的意思就是即可以从它读取数据,又可以提供数据让其它流读取。
const entry = {
sink: writable(
(nodes, callback) => {
pending += nodes.length
nodes.forEach((node) => entry.source.push(node))
setImmediate(callback)
},
null,
1,
(err) => entry.source.end(err)
),
source: pushable()
}
source
流是 pull-pushable 类库提供的一个可以其它流主动 push 的 pull-stream 源流,它提供了一个
push
方法。
当调用这个方法时,它开始调用回调函数,从而把数据传递给后续的 through 或 sink。
当时,它还提供了一个
end
方法,当数据读取完成后,调用这个方法。
sink
流是 pull-write 类库提供的一个创建通用 pull-stream
sinks
流的基础类。
它的签名如下:
(write, reduce, max, cb)
, 因为它是一个
sinks
流,所以它会读取前一个流的数据,在读取到数据之后就调用它的
write
方法保存读取到的数据,如果数据读取完成就调用它的
cb
方法。
在这里
sink
函数从前一个流中读取数据,然后放入
source
中。同时,
source
成为下一个流的读取函数。
生成一个
dagStream
对象,这个对象也是一个
{source,sink}
对象。
const dagStream = DAGBuilder(Chunker, ipld, options)
DAGBuilder
函数定义于父目录下的
builder/index.js
中,接下来我们看下这个执行过程:
第 1 个参数
Chunker/createChunker
,它表示具体分割内容的策略,默认情况下为
fixed
,详见第一步中的
defaultOptions
变量内容;
第 2 个参数
ipld/ipld
,这个是 IPFS 对象的
_ipld
属性,在 IPFS 对象创建时生成的,表示星际接续的数据,目前它可以连接比特币、以太坊、git、zcash 等,在 IPFS 体系中具有非常重要的位置;
第 3 个参数
reducer/createReducer
是具体的 reduce 策略,默认情况为
balanced
,详见第四步中生成
reducer
变量的过程。
第 4 个参数
options/_options
为选项。
合并指定的选项和自身默认的选项。
const options = extend({}, defaultOptions, _options)
默认选项如下:
const defaultOptions = {
chunkerOptions: {
maxChunkSize: 262144,
avgChunkSize: 262144
},
rawLeaves: false,
hashAlg: 'sha2-256',
leafType: 'file',
cidVersion: 0,
progress: () => {}
}
返回一个函数对象。
return function (source) {
return function (items, cb) {
parallel(items.map((item) => (cb) => {
if (!item.content) {
return createAndStoreDir(item, (err, node) => {
if (err) {
return cb(err)
}
if (node) {
source.push(node)
}
cb()
})
}
createAndStoreFile(item, (err, node) => {
if (err) {
return cb(err)
}
if (node) {
source.push(node)
}
cb()
})
}), cb)
}
}
合并选项参数和默认选项
const options = Object.assign({}, defaultOptions, _options)
默认选项如下:
const defaultOptions = {
strategy: 'balanced',
highWaterMark: 100,
reduceSingleLeafToSelf: true
}
根据选项指定的 reduce 策略,从系统提供的多个策略中选择指定的策略。
const strategyName = options.strategy
const reducer = reducers[strategyName]
系统定义的的策略如下:
const reducers = {
flat: require('./flat'),
balanced: require('./balanced'),
trickle: require('./trickle')
}
在用户不指定具体策略的默认情况下,根据前面执行过程,最终选定的策略为
balanced
。
调用
Builder
方法创建最终的策略对象。
const createStrategy = Builder(Chunker, ipld, reducer, options)
Builder
方法位于
builder.js
文件中,它会创建一个 pull-stream 的 through 流对象。
在看它的内部之前,我们首先看下的 4个参数。看完参数,接下来,我们看下它的执行逻辑。
返回的这个函数,最终成为了一个
sink
流的
write
方法。
调用
createBuildStream
方法,生成一个双向流对象。
createBuildStream(createStrategy, ipld, options)
createBuildStream
方法位于
create-build-stream.js
文件中,代码如下:
const source = pullPushable()
const sink = pullWrite(
createStrategy(source),
null,
options.highWaterMark,
(err) => source.end(err)
)
return {
source: source,
sink: sink
}
在这段代码中,
source
流是 pull-pushable 类库提供的一个可以主动 push 到其它流的 pull-stream 源流,这个类库在前面我们已经分析过,这里就直接略过。
sink
流是 pull-write 类库提供的一个创建通用 pull-stream
sinks
流的基础类,这个类库也在前面分析过,这里也不细讲,我们只看下它的
write
方法。
这里的
createStrategy
函数正是调用
Builder
方法返回的
createStrategy
函数,用
source
作为参数,调用它,用返回的第二层匿名函数作为
write
方法。
生成一个树构建器流对象,并返回其双向流对象。
const treeBuilder = createTreeBuilder(ipld, options)
const treeBuilderStream = treeBuilder.stream()
createTreeBuilder
函数位于
tree-builder.js
文件中,我们来看它的执行逻辑。
首先,合并默认选项对象和指定的选项对象。
const options = Object.assign({}, defaultOptions, _options)
默认选择对象如下:
const defaultOptions = {
wrap: false,
shardSplitThreshold: 1000,
onlyHash: false
}
onlyHash
表示是否不保存文件/内容,只计算其哈希。
创建一个队列对象。
const queue = createQueue(consumeQueue, 1)
创建一个双向流对象
let stream = createStream()
其中
sink
对象是一个
pull-write
类库提供的流,这个已经见过多次了。
它的
write
方法后面遇到时再来看,
source
是一个
pull-pushable
类库提供的流,这个也见过多次。
创建一个
DirFlat
对象。
let tree = DirFlat({
path: '',
root: true,
dir: true,
dirty: false,
flat: true
}, options)
返回特权函数构成的对象。
return {
flush: flushRoot,
stream: getStream
}
创建一个暂停流。这里什么也不做。
调用
pull
方法,创建一个完整的流来保存文件内容。
pull(
entry,
pausable,
dagStream,
map((node) => {
pending--
if (!pending) {
process.nextTick(() => {
while (waitingPending.length) {
waitingPending.shift()()
}
})
}
return node
}),
treeBuilderStream
)
pull 函数是 pull-stream 是类库中的核心函数。
在它的执行过程中,最后的 sink 流通过依次调用前面的 through 流,最终从最前面的 source 流中拉取数据。
除了最前面的 Source 流和最后面的 Sink 流,中间的都是 through 流,它们即可以被后面的流调用以提供数据,也可以调用前面的流来读取数据。
当
pull
函数在调用某个参数从前面读取数据时,如果当前参数是一个对象(即双向流)时,那么就会调用它的
sink
方法来读取。
同时用它的
source
方法作为后面参数的读取方法。
下面我们分析这段代码中的几个流,它们太重要了。
首先是
entry
流,它是一个双向流,它的
sink
函数(类型为
pull-write
流)调用前一个流的
read
方法来读取数据,并把读取到的数据放在
source
中(类型为 pull-pushable )。
然后是
dagStream
流,它也是一个双向流,它的
sink
函数(类型为
pull-write
流)调用
entry
流的
source
方法来读取数据。
sink
函数的异步写函数参数为
builder.js
中返回的第二层函数,当读取到数据之后,调用
builder.js
中返回的第二层函数进行处理,在第二层函数中,大致流程是把数据保存自身的
source
中(类型为 pull-pushable )。
dagStream 在
create-build-stream.js
中生成。为了方便理解,这里我们再次看下它的代码。
const source = pullPushable()
const sink = pullWrite( createStrategy(source), null, options.highWaterMark, (err) => source.end(err) )
return { source: source, sink: sink }
最后是
treeBuilderStream
流,它也是一个双向流,它的
sink
函数(类型为
pull-write
流)调用
dagStream
流的
source
方法来读取数据,并把读取到的数据放在
source
中(类型为 pull-pushable )。
其他两个流对流程没有任何影响,读者可以自行分析,这里略过不提。
在这一步中,通过
pull
函数把最重要的几个流连接到一起,并通过下面最后一步,把它们与外部的流联系到一起。
最后,返回双向流对象。
{
sink: entry.sink,
source: treeBuilderStream.source,
flush: flush
}
到这里,文件已经保存完成了。
啥?文件已经保存完成了?
什么都没看到就把文件保存完了,不会骗我们的吧?
哈,因为保存文件这个动作太复杂了,所以上面只是静态的从代码层面进行梳理。
下面我们从头到尾从动态处理的过程来看下文件到底是怎么保存在本地的。
一切要从我们在上篇写的这个示例说起
const {createNode} = require('ipfs')
const node = createNode({
libp2p:{
config:{
dht:{
enabled:true
}
}
}
})
node.on('ready', async () => {
const content = `我爱黑萤`;
const filesAdded = await node.add({
content: Buffer.from(content)
},{
chunkerOptions:{
maxChunkSize:1000,
avgChunkSize:1000
}
})
console.log('Added file:', filesAdded[0].path, filesAdded[0].hash)
})
上面这段代码,最终执行的是
core/components/files-regular/add-pull-stream.js
文件中的函数,它的主体就是下面的这段代码:
pull(
pull.map(content => normalizeContent(content, opts)),
pull.flatten(),
importer(self._ipld, opts),
pull.asyncMap((file, cb) => prepareFile(file, self, opts, cb)),
pull.map(file => preloadFile(file, self, opts)),
pull.asyncMap((file, cb) => pinFile(file, self, opts, cb))
)
为了便于分析理解,我们在分析过程中仍然使用推的方式,从源流推到目的流中,注意这个仅是为了理解方便,真实的过程是目的流从源流中拉取数据。
下面代码简单解释如下:
-
首先,调用第一个
pull.map
流,对收到的文件或内容并进行一些必要的转换,
-
调用
pull.flatten
流,把前一步生成的数组进行扁平化处理。
-
调用
importer
流来保存内容。
-
调用
pull.asyncMap
方法,对已经保存的文件/内容进行预处理,生成用户看到的内容。
-
调用
pull.map
方法,把已经保存到本地的文件预加载到指定节点。
-
调用
pull.asyncMap
方法,把已经保存到本地的文件长期保存在本地,确保不被垃圾回收。
下面我们重点看下文件内容在
importer
流中的处理逻辑。
1,调用
entry.sink
函数从前面的
pull.flatten
流中读取保存的每一个文件/内容。
2,调用
dagStream.sink
函数从前面的流中读取数据,并在读取到数据之后,调用
builder.js
中定义的第二层匿名函数进行处理。
在这个函数中,调用异步流程库
async
的
parallel
方法对收到的每个要处理的文件内容进行处理。
具体处理如下:如果保存的是目录,那么调用
createAndStoreDir