专栏名称: 狗厂
目录
相关文章推荐
51好读  ›  专栏  ›  狗厂

Go并发实战: 搭配 influxdb + grafana 高性能实时日志监控系统

狗厂  · 掘金  ·  · 2018-05-07 05:44

正文

date: 2018-4-27 16:22:14
title: go| go并发实战: 搭配 influxdb + grafana 高性能实时日志监控系统
description: go并发实战: 搭配 influxdb + grafana 高性能实时日志监控系统

Go并发编程案例解析

继续好玩的并发编程实战, 上一篇 go| 感受并发编程的乐趣 前篇 .

实战内容: 实时处理读取/解析日志文件, 搭配 influxdb(时序数据库) 存储, grafana 展示, 并提供系统的简单监控.

0x00: 初始化, 面向过程编程

用面向过程的方式, 对问题进行简单的梳理, 代码如下:

package main

func main() {
    // read log file

    // process log

    // write data
}

这里并没有写具体的实现, 因为到这里, 我们就可以开始考虑 封装

0x01: 过程封装, 使用 LogPorcess 结构体

引入 LogProcess 结构体, 将整个任务 面向对象 化, 伪代码如下:

package main

import (
    "fmt"
    "strings"
)

type LogProcess struct {
    path string // 日志文件路径
    dsn string // influxdb dsn
}

func (lp *LogProcess) Read() {
    path := lp.path
    fmt.Println(path)
}

func (lp *LogProcess) Process() {
    log := "hello world"
    fmt.Println(strings.ToUpper(log))
}

func (lp *LogProcess) Write()  {
    dsn := lp.dsn
    fmt.Println(dsn)
}

func main() {
    lp := &LogProcess{
        path: "test path",
        dsn: "test dsn",
    }

    // read log file
    lp.Read()

    // process log
    lp.Process()

    // write data
    lp.Write()
}

0x02: 加上 go 和 chan, 并发就是如此简单

加上 go 关键字, 轻松实现协程:

func main() {
    lp := &LogProcess{
        path: "test path",
        dsn: "test dsn",
    }

    // read log file
    go lp.Read()

    // process log
    go lp.Process()

    // write data
    go lp.Write()

    time.Sleep(time.Second) // 新手必知: 保证程序退出前, 协程可以执行完
}

加上 chan, 轻松实现协程间通信:

type LogProcess struct {
    path string // 日志文件路径
    dsn string // influxdb dsn
    rc chan string // read chan
    wc chan string // write chan
}

func (lp *LogProcess) Read() {
    path := lp.path
    fmt.Println(path)

    lp.rc <- "test data"
}

func (lp *LogProcess) Process() {
    log := <- lp.rc
    lp.wc <- strings.ToUpper(log)
}

func (lp *LogProcess) Write()  {
    dsn := lp.dsn
    fmt.Println(dsn)

    data := <- lp.wc
    fmt.Println(data)
}

0x03: 引入 interface, 方便以后扩展

现在是从 文件 读取, 如果以后要从 其他数据源 读取呢? 这个时候就可以用上接口:

type Reader interface {
    Read(rc chan string)
}

type ReadFromFile struct {
    path string
}

func (r *ReadFromFile) Read(rc chan string) {
    // read from file
}

同理, 数据写入到 influxdb 也可以加入接口, 方便以后扩展.

0x04: 读取文件的细节

实时读取日志文件要怎么实现呢? 直接上代码, 细节有很多, 注意 注释 :

  • 实时 读取怎么实现: 从文件末尾开始读取
  • 怎么一行一行的读取日志: buf.ReadBytes('\n')
  • 输出怎么多了换行呢: 截取掉最后的换行符 line[:len(line)-1]
func (r *ReadFromFile) Read(rc chan []byte) {
    f, err := os.Open(r.path)
    if err != nil {
        panic(err)
    }
    defer f.Close()

    f.Seek(0, 2) // 文件末尾
    buf := bufio.NewReader(f) // []byte 数据类型, rc chan 的类型也相应进行了修改

    for {
        line, err := buf.ReadBytes('\n')
        // todo: 处理日志切割, inode 变化的情况
        if err == io.EOF {
            time.Sleep(500 * time.Millisecond)
        } else if err != nil {
            panic(err)
        } else { // 需要写到这里
            rc <- line[:len(line)-1]
        }
    }
}

还有一个需要优化的地方, 一般日志文件都会采取 轮转 策略(详见上篇blog devops| 日志服务实践 ), 文件可能更新了, 所以读取文件时, 还需要加一个判断.

0x05: 日志解析, 又见正则

日志的解析比较简单, 按照日志的格式正则匹配即可:

// 使用结构体来记录匹配到的日志数据
type Log struct {
    TimeLocal                    time.Time
    BytesSent                    int
    Path, Method, Scheme, Status string
    UpstreamTime, RequestTime    float64
}

func (l *LogProcess) Process() {
    // 正则
    re := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(
\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([d\.-]+)`)

    loc, _ := time.LoadLocation("PRC")
    for v := range l.rc {
        str := string(v)
        ret := re.FindStringSubmatch(str)
        if len(ret) != 14 {
            log.Println(str)
            continue
        }

        msg := &Log{}
        t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
        if err != nil {
            log.Println(ret[4])
        }
        msg.TimeLocal = t

        byteSent, _ := strconv.Atoi(ret[8])
        msg.BytesSent = byteSent

        // Get /for?query=t HTTP/1.0
        reqSli := strings.Split(ret[6], " ")
        if len(reqSli) != 3 {
            log.Println(ret[6])
            continue
        }
        msg.Method = reqSli[0]
        msg.Scheme = reqSli[2]
        // url parse
        u, err := url.Parse(reqSli[1])
        if err != nil {
            log.Println(reqSli[1])
            continue
        }
        msg.Path = u.Path
        msg.Status = ret[7]
        upTime, _ := strconv.ParseFloat(ret[12], 64)
        reqTime, _ := strconv.ParseFloat(ret[13], 64)
        msg.UpstreamTime = upTime
        msg.RequestTime = reqTime

        l.wc <- msg
    }
}

0x06: 上手 influxdb

influxdb 是时序数据库的一种, 包含如下基础概念:







请到「今天看啥」查看全文