专栏名称: AI科技大本营
迎来到AI科技大本营。这里汇集了优秀的AI学习者,技术大咖和产业领袖;提供接地气的实战课程。在这里和优秀的人一起成长。
目录
相关文章推荐
爱可可-爱生活  ·  【[261星] ... ·  21 小时前  
爱可可-爱生活  ·  【NBP:通过半自回归建模实现视频生成的创新 ... ·  昨天  
爱可可-爱生活  ·  【[15.3k星]PandasAI:用自然语 ... ·  昨天  
命里有票  ·  用DeepSeek和豆包分别计算了一个日期问 ... ·  2 天前  
命里有票  ·  用DeepSeek和豆包分别计算了一个日期问 ... ·  2 天前  
51好读  ›  专栏  ›  AI科技大本营

关于Spark NLP学习,你需要掌握的LightPipeline(附代码)| CSDN博文精选

AI科技大本营  · 公众号  · AI  · 2020-01-06 18:31

正文


作者 | Veysel Kocama n, Data Scientist & ML Researcher ANKIT CHOUDHARY
翻译 | 赵春光
校对 | 申利彬
来源 | 数据派THU
(*点击阅读原文,查看作者更多精彩文章)

【导读】Pipeline具体来说是一个多阶段的序列,每个阶段由一个Transformer或者Estimator组成。 各个阶段按顺序执行,并将输入的DataFrame转换和传递给下一个阶段。 数据如此按序的在pipeline中传递。 我们现在再来看看Spark NLP是如何使用Annotator和Transformer的。

本文是Spark NLP Library中各annotator系列中的第2篇文章, 介绍Spark NLP中是如何使用Annotator和Transformer 。如果你想更多的学习Spark NLP及对应的概念,请先阅读下述文章:
Introduction to Spark NLP: Foundations and Basic Components (Part-I)https://medium.com/spark-nlp/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59

本文主要是作为上篇文章的延续。

在机器学习中,常见的一种做法是运行一系列的算法来处理和学习数据。这种算法序列常被称作做Pipeline。

Pipeline具体来说是一个多阶段的序列,每个阶段由一个Transformer或者Estimator组成。各个阶段按顺序执行,并将输入的DataFrame转换和传递给下一个阶段,数据如此按序的在pipeline中传递。每个阶段的transform()方法函数更新这组数据集并传递到下一阶段。因为有了pipeline,训练数据和测试数据会通过确保一致的特征处理环节。

每个使用的annotator 会在pipeline中的这个data frame新添一列。

我们现在来看一下Spark NLP是如果使用Annotator和Transformer完成上述过程。假如我们需要将如下几个环节逐一施加在data frame上:

  • 文本分离成语句
  • 分词
  • 正交化
  • 得到词嵌入向量
下面是通过Spark NLP实现这个pipeline的代码:
1from pyspark.ml import Pipeline
2
3document_assembler = DocumentAssembler()\ .setInputCol(“text”)\ .setOutputCol(“document”)
4
5sentenceDetector = SentenceDetector()\ .setInputCols([“document”])\ .setOutputCol(“sentences”)
6
7tokenizer = Tokenizer() \ .setInputCols([“sentences”]) \ .setOutputCol(“token”)
8
9normalizer = Normalizer()\ .setInputCols([“token”])\ .setOutputCol(“normal”)
10
11word_embeddings=WordEmbeddingsModel.pretrained()\ .setInputCols([“document”,”normal”])\ .setOutputCol(“embeddings”)
12
13nlpPipeline = Pipeline(stages=[ document_assembler,  sentenceDetector, tokenizer, normalizer, word_embeddings, ])
14
15pipelineModel = nlpPipeline.fit(df)
接下来我们加载了一组数据到这个pipeline中,看一下模型如何工作。
Dataframe样本(5452行)

然后运行上述pipeline,我们会得到一个训练好的模型。之后我们用它转换整个DataFrame。

1result = pipelineModel.transform(df)
2result.show()
转换前20行数据用了501毫秒;转换整个data frame共用了11秒。
1%%timeresult = pipelineModel.transform(df).collect()
2>>>CPU times: user 2.01 s, sys: 425 ms, total: 2.43 s
3Wall time: 11 s
如果我们想把这个pipeline保存到硬盘,然后调用它转换一行文字,在线转换时间会多长呢?
1from pyspark.sql import Row
2text = "How did serfdom develop in and then leave Russia ?"
3line_df = spark.createDataFrame(list(map(lambda x: Row(text=x), [text])), ["text"])
4%time result = pipelineModel.transform(line_df).collect()
5
6>>>CPU times: user 31.1 ms, sys: 7.73 ms, total: 38.9 msWall time: 515 ms
转换一行短文字的时间也是515毫秒!几乎是和之前转换20行的时间一致。所以说,效果太好。实际上,类似的情况也发生在使用分布式处理小数据的时候。分布式处理和云计算主要是用来处理大数据,而使用Spark来处理小型数据其实是杀鸡用牛刀。
实际上,由于它内部的机制和优化后的构架,Spark仍适用于中等大小单机可处理的数据。但不建议使用Spark来处理仅仅是几行的数据, 除非使用Spark NLP。
打个比方,Spark 好像一个火车和一个自行车赛跑。自行车会在轻载的时候占上风,因为它更敏捷、提速更快,而重载的火车可能需要一段时间提速,但最终还是会速度更快。

所以,如果我们想要预测的时间更快该怎么办呢? 使用LightPipeline。

LightPipeline

LightPipelines 是Spark NLP对应的Pipeline, 等同于Spark ML Pipeline, 但是用于处理更小的数据。它们适用于小数据集、调试结果,或者是对一次性服务API请求的训练或预测。

Spark NLP LightPipelines 是将Spark ML Pipelines 转换成了一个单机但多线程的任务,对于小型数据(不大于5万个句子)速度会提升10倍。

这些Pipeline的使用方法是插入已训练(已拟合)的模型,然后会标注纯文本。我们都不需要把输入文字转换成Dataframe就可以输入pipeline,虽然pipeline当初是使用Dataframe作为输入。这个便捷的功能适用于使用已训练的模型对少数几行文字进行预测。

1from sparknlp.base import LightPipeline
2LightPipeline(someTrainedPipeline).annotate(someStringOrArray)
下面是一些LightPipelines可用的方法函数。我们还可以用字符列表作为输入文字。

https://nlp.johnsnowlabs.com/api/#com.johnsnowlabs.nlp.LightPipeline

我们可以很方便的创建LightPipelines,也不需要处理Spark Datasets。LightPipelines运行的也很快,而且在驱动节点工作时可执行并行运算。下面是一个应用的例子:
1from sparknlp.base import LightPipeline
2lightModel = LightPipeline(pipelineModel, parse_embeddings=True)
3%time lightModel.annotate("How did serfdom develop in and then leave Russia ?")
4>>>
5CPU times: user 12.4 ms, sys: 3.81 ms, total: 16.3 ms
6Wall time28.3 ms
7{'sentences': ['How did serfdom develop in and then leave Russia ?'],
8 'document': ['How did serfdom develop in and then leave Russia ?'],
9 'normal': ['How',
10  'did',
11  'serfdom',
12  'develop',
13  'in',
14  'and',
15  'then',
16  'leave',
17  'Russia'],
18 'token': ['How',
19  'did',
20  'serfdom',
21  'develop',
22  'in',
23  'and',
24  'then',
25  'leave',
26  'Russia',
27  '?'],
28 'embeddings': ['-0.23769 0.59392 0.58697 -0.041788 -0.86803 -0.0051122 -0.4493 -0.027985, ...]}
29

这个代码用了28毫秒!几乎是使用Spark ML Pipeline时的20倍速度。
上面可以看出,annotate只返回了result的属性。既然这个嵌入向量数组储存在embedding属性的WordEmbeddingModel标注器下,我们可以设置parse_embedding = True 来分析嵌入向量数据。否则,我们可能在输出中只能获得嵌入向量的分词属性。关于上述属性的更多信息见以下连接:
https://medium.com/spark-nlp/spark-nlp-101-document-assembler-500018f5f6b5
如果我们想获取标注的全部信息,我们还可以使用fullAnnotate()来返回整个标注内容的字典列表。
1result = lightModel.fullAnnotate("How did serfdom develop in and then leave Russia ?")
2>>>
3[{'sentences': [],
4  'document': [base
.Annotation at 0x149b5a320 >],
5  'normal': [base.Annotation at 0x139d9e940>,
6   base.Annotation at 0x139d64860>,
7   base.Annotation at 0x139d689b0>,
8   base.Annotation at 0x139dd16d8>,
9   base.Annotation at 0x139dd1c88>,
10   base.Annotation at 0x139d681d0>,
11   base.Annotation at 0x139d89128>,
12   base.Annotation at 0x139da44a8>,
13   base.Annotation at 0x139da4f98>],
14  'token': [base.Annotation at 0x149b55400>,
15   base.Annotation at 0x139dd1668>,
16   base.Annotation at 0x139dad358>,
17   base.Annotation at 0x139d8dba8>,
18   base.Annotation at 0x139d89710>,
19   base.Annotation at 0x139da4208>,
20   base.Annotation at 0x139db2f98>,
21   base.Annotation at 0x139da4240>,
22   base.Annotation at 0x149b55470>,
23   base.Annotation at 0x139dad198>],
24  'embeddings': [base.Annotation at 0x139dad208>,
25   base.Annotation at 0x139d89898>,
26   base.Annotation at 0x139db2860>,
27   base.Annotation at 0x139dbbf28>,
28   base.Annotation at 0x139dbb3c8>,
29   base.Annotation at 0x139db2208>,
30   base.Annotation at 0x139da4668>,
31   base.Annotation at 0x139dd1ba8>,
32   base.Annotation at 0x139d9e400>]}]
fullAnnotate()返回标注类型中的内容和元数据。根据参考文档,这个标定类型有如下属性:
参考文档: https://nlp.johnsnowlabs.com/api/#com.johnsnowlabs.nlp.Annotation
1annotatorType: String,
2beginInt,
3endInt,
4resultString, (this is what annotate returns)
5metadata: Map[StringString],
6embeddings: Array[Float]
所以,下面的代码可以返回一个句子的起始或者结束:
1result[0]['sentences'][0].begin
2>> 0
3result[0]['sentences'][0].end
4>> 49
5result[0]['sentences'][0].result
6>> 'How did serfdom develop in and then leave Russia ?'
嵌入向量每个分词的的元数据也可以得到:
1result[0]['embeddings'






请到「今天看啥」查看全文