/**
* 如果已有offset,则从offset开始读数据
*
* @param ssc : StreamingContext
* @param kafkaParams : kafkaParams配置参数
* @param fromOffsets : 已有的offsets
* @return : 返回流数据
*/
private def getDirectStreamWithOffsets(ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long]): DStream[String] = {
val kfkData = try {
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
ssc,
kafkaParams,
fromOffsets,
(mmd: MessageAndMetadata[String, String]) => mmd.message()
)
} catch { //offsets失效, 从最新的offsets读。
case _: Exception =>
val topics = fromOffsets.map { case (tap, _) =>
tap.topic
}.toSet
getDirectStream(ssc, kafkaParams, topics)
}
kfkData
}