正文
date: 2018-4-27 16:22:14
title: go| go并发实战: 搭配 influxdb + grafana 高性能实时日志监控系统
description: go并发实战: 搭配 influxdb + grafana 高性能实时日志监控系统
Go并发编程案例解析
继续好玩的并发编程实战, 上一篇
go| 感受并发编程的乐趣 前篇
.
实战内容: 实时处理读取/解析日志文件, 搭配 influxdb(时序数据库) 存储, grafana 展示, 并提供系统的简单监控.
0x00: 初始化, 面向过程编程
用面向过程的方式, 对问题进行简单的梳理, 代码如下:
package main
func main() {
// read log file
// process log
// write data
}
这里并没有写具体的实现, 因为到这里, 我们就可以开始考虑
封装
了
0x01: 过程封装, 使用 LogPorcess 结构体
引入 LogProcess 结构体, 将整个任务
面向对象
化, 伪代码如下:
package main
import (
"fmt"
"strings"
)
type LogProcess struct {
path string // 日志文件路径
dsn string // influxdb dsn
}
func (lp *LogProcess) Read() {
path := lp.path
fmt.Println(path)
}
func (lp *LogProcess) Process() {
log := "hello world"
fmt.Println(strings.ToUpper(log))
}
func (lp *LogProcess) Write() {
dsn := lp.dsn
fmt.Println(dsn)
}
func main() {
lp := &LogProcess{
path: "test path",
dsn: "test dsn",
}
// read log file
lp.Read()
// process log
lp.Process()
// write data
lp.Write()
}
0x02: 加上 go 和 chan, 并发就是如此简单
加上 go 关键字, 轻松实现协程:
func main() {
lp := &LogProcess{
path: "test path",
dsn: "test dsn",
}
// read log file
go lp.Read()
// process log
go lp.Process()
// write data
go lp.Write()
time.Sleep(time.Second) // 新手必知: 保证程序退出前, 协程可以执行完
}
加上 chan, 轻松实现协程间通信:
type LogProcess struct {
path string // 日志文件路径
dsn string // influxdb dsn
rc chan string // read chan
wc chan string // write chan
}
func (lp *LogProcess) Read() {
path := lp.path
fmt.Println(path)
lp.rc <- "test data"
}
func (lp *LogProcess) Process() {
log := <- lp.rc
lp.wc <- strings.ToUpper(log)
}
func (lp *LogProcess) Write() {
dsn := lp.dsn
fmt.Println(dsn)
data := <- lp.wc
fmt.Println(data)
}
0x03: 引入 interface, 方便以后扩展
现在是从
文件
读取, 如果以后要从
其他数据源
读取呢? 这个时候就可以用上接口:
type Reader interface {
Read(rc chan string)
}
type ReadFromFile struct {
path string
}
func (r *ReadFromFile) Read(rc chan string) {
// read from file
}
同理, 数据写入到
influxdb
也可以加入接口, 方便以后扩展.
0x04: 读取文件的细节
实时读取日志文件要怎么实现呢? 直接上代码, 细节有很多, 注意
注释
:
-
实时
读取怎么实现: 从文件末尾开始读取
-
怎么一行一行的读取日志:
buf.ReadBytes('\n')
-
输出怎么多了换行呢: 截取掉最后的换行符
line[:len(line)-1]
func (r *ReadFromFile) Read(rc chan []byte) {
f, err := os.Open(r.path)
if err != nil {
panic(err)
}
defer f.Close()
f.Seek(0, 2) // 文件末尾
buf := bufio.NewReader(f) // []byte 数据类型, rc chan 的类型也相应进行了修改
for {
line, err := buf.ReadBytes('\n')
// todo: 处理日志切割, inode 变化的情况
if err == io.EOF {
time.Sleep(500 * time.Millisecond)
} else if err != nil {
panic(err)
} else { // 需要写到这里
rc <- line[:len(line)-1]
}
}
}
还有一个需要优化的地方, 一般日志文件都会采取
轮转
策略(详见上篇blog
devops| 日志服务实践
), 文件可能更新了, 所以读取文件时, 还需要加一个判断.
0x05: 日志解析, 又见正则
日志的解析比较简单, 按照日志的格式正则匹配即可:
// 使用结构体来记录匹配到的日志数据
type Log struct {
TimeLocal time.Time
BytesSent int
Path, Method, Scheme, Status string
UpstreamTime, RequestTime float64
}
func (l *LogProcess) Process() {
// 正则
re := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(
\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([d\.-]+)`)
loc, _ := time.LoadLocation("PRC")
for v := range l.rc {
str := string(v)
ret := re.FindStringSubmatch(str)
if len(ret) != 14 {
log.Println(str)
continue
}
msg := &Log{}
t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
if err != nil {
log.Println(ret[4])
}
msg.TimeLocal = t
byteSent, _ := strconv.Atoi(ret[8])
msg.BytesSent = byteSent
// Get /for?query=t HTTP/1.0
reqSli := strings.Split(ret[6], " ")
if len(reqSli) != 3 {
log.Println(ret[6])
continue
}
msg.Method = reqSli[0]
msg.Scheme = reqSli[2]
// url parse
u, err := url.Parse(reqSli[1])
if err != nil {
log.Println(reqSli[1])
continue
}
msg.Path = u.Path
msg.Status = ret[7]
upTime, _ := strconv.ParseFloat(ret[12], 64)
reqTime, _ := strconv.ParseFloat(ret[13], 64)
msg.UpstreamTime = upTime
msg.RequestTime = reqTime
l.wc <- msg
}
}
0x06: 上手 influxdb
influxdb 是时序数据库的一种, 包含如下基础概念: