正文
再次整理了一下这个日志收集系统的框,如下图
这次要实现的代码的整体逻辑为:
完整代码地址为:
github.com/pythonsite/…
etcd介绍
高可用的分布式key-value存储,可以用于配置共享和服务发现
类似的项目:zookeeper和consul
开发语言:go
接口:提供restful的接口,使用简单
实现算法:基于raft算法的强一致性,高可用的服务存储目录
etcd的应用场景:
-
服务发现和服务注册
-
配置中心(我们实现的日志收集客户端需要用到)
-
分布式锁
-
master选举
官网对etcd的有一个非常简明的介绍:
etcd搭建:
下载地址:
github.com/coreos/etcd…
根据自己的环境下载对应的版本然后启动起来就可以了
启动之后可以通过如下命令验证一下:
[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl set name zhaofan
zhaofan
[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl get name
zhaofan
[root@localhost etcd-v3.2.18-linux-amd64]#
context 介绍和使用
其实这个东西翻译过来就是上下文管理,那么context的作用是做什么,主要有如下两个作用:
通过下面一个简单的例子进行理解:
package main
import (
"fmt"
"time"
"net/http"
"context"
"io/ioutil"
)
type Result struct{
r *http.Response
err error
}
func process(){
ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
defer cancel()
tr := &http.Transport{}
client := &http.Client{Transport:tr}
c := make(chan Result,1)
req,err := http.NewRequest("GET","http://www.google.com",nil)
if err != nil{
fmt.Println("http request failed,err:",err)
return
}
// 如果请求成功了会将数据存入到管道中
go func(){
resp,err := client.Do(req)
pack := Result{resp,err}
c <- pack
}()
select{
case <- ctx.Done():
tr.CancelRequest(req)
fmt.Println("timeout!")
case res := <-c:
defer res.r.Body.Close()
out,_:= ioutil.ReadAll(res.r.Body)
fmt.Printf("server response:%s",out)
}
return
}
func main() {
process()
}
写一个通过context保存上下文,代码例子如:
package main
import (
"github.com/Go-zh/net/context"
"fmt"
)
func add(ctx context.Context,a,b int) int {
traceId := ctx.Value("trace_id").(string)
fmt.Printf("trace_id:%v\n",traceId)
return a+b
}
func calc(ctx context.Context,a, b int) int{
traceId := ctx.Value("trace_id").(string)
fmt.Printf("trace_id:%v\n",traceId)
//再将ctx传入到add中
return add(ctx,a,b)
}
func main() {
//将ctx传递到calc中
ctx := context.WithValue(context.Background(),"trace_id","123456")
calc(ctx,20,30)
}
结合etcd和context使用
关于通过go连接etcd的简单例子:(这里有个小问题需要注意就是etcd的启动方式,默认启动可能会连接不上,尤其你是在虚拟你安装,所以需要通过如下命令启动:
./etcd --listen-client-urls
http://0.0.0.0:2371
--advertise-client-urls
http://0.0.0.0:2371
--listen-peer-urls
http://0.0.0.0:2381
)
package main
import (
etcd_client "github.com/coreos/etcd/clientv3"
"time"
"fmt"
)
func main() {
cli, err := etcd_client.New(etcd_client.Config{
Endpoints:[]string{"192.168.0.118:2371"},
DialTimeout:5*time.Second,
})
if err != nil{
fmt.Println("connect failed,err:",err)
return
}
fmt.Println("connect success")
defer cli.Close()
}
下面一个例子是通过连接etcd,存值并取值
package main
import (
"github.com/coreos/etcd/clientv3"
"time"
"fmt"
"context"
)
func main() {
cli,err := clientv3.New(clientv3.Config{
Endpoints:[]string{"192.168.0.118:2371"},
DialTimeout:5*time.Second,
})
if err != nil{
fmt.Println("connect failed,err:",err)
return
}
fmt.Println("connect succ")
defer cli.Close()
ctx,cancel := context.WithTimeout(context.Background(),time.Second)
_,err = cli.Put(ctx,"logagent/conf/","sample_value")
cancel()
if err != nil{
fmt.Println("put failed,err",err)
return
}
ctx, cancel = context.WithTimeout(context.Background(),time.Second)
resp,err := cli.Get(ctx,"logagent/conf/")
cancel()
if err != nil{
fmt.Println("get failed,err:",err)
return
}
for _,ev := range resp.Kvs{
fmt.Printf("%s:%s\n",ev.Key,ev.Value)
}
}
关于context官网也有一个例子非常有用,用于控制开启的goroutine的退出,代码如下:
package main
import (
"context"
"fmt"
)
func main() {
// gen generates integers in a separate goroutine and
// sends them to the returned channel.
// The callers of gen need to cancel the context once
// they are done consuming generated integers not to leak
// the internal goroutine started by gen.
gen := func(ctx context.Context) <-chan int {
dst := make(chan int)
n := 1
go func() {
for {
select {
case <-ctx.Done():
return // returning not to leak the goroutine
case dst <- n:
n++
}
}
}()
return dst
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel when we are finished consuming integers
for n := range gen(ctx) {
fmt.Println(n)
if n == 5 {
break
}
}
}
关于官网文档中的WithDeadline演示的代码例子:
package main
import (
"context"
"fmt"
"time"
)
func main() {
d := time.Now().Add(50 * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), d)
// Even though ctx will be expired, it is good practice to call its
// cancelation function in any case. Failure to do so may keep the
// context and its parent alive longer than necessary.
defer cancel()
select {
case <-time.After(1 * time.Second):
fmt.Println("overslept")
case <-ctx.Done():
fmt.Println(ctx.Err())
}
}
通过上面的代码有了一个基本的使用,那么如果我们通过etcd来做配置管理,如果配置更改之后,我们如何通知对应的服务器配置更改,通过下面例子演示:
package main
import (
"github.com/coreos/etcd/clientv3"
"time"
"fmt"
"context"
)
func main() {
cli,err := clientv3.New(clientv3.Config{
Endpoints:[]string{"192.168.0.118:2371"},
DialTimeout:5*time.Second,
})
if err != nil {
fmt.Println("connect failed,err:",err)
return
}
defer cli.Close()
// 这里会阻塞
rch := cli.Watch(context.Background(),"logagent/conf/")
for wresp := range rch{
for _,ev := range wresp.Events{
fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
实现一个kafka的消费者代码的简单例子:
package main
import (
"github.com/Shopify/sarama"
"strings"
"fmt"
"time"
)
func main() {
consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
if err != nil{
fmt.Println("failed to start consumer:",err)
return
}
partitionList,err := consumer.Partitions("nginx_log")
if err != nil {
fmt.Println("Failed to get the list of partitions:",err)
return
}
fmt.Println(partitionList)
for partition := range partitionList{
pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)
return
}
defer pc.AsyncClose()
go func(partitionConsumer sarama.PartitionConsumer){
for msg := range pc.Messages(){
fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
}
}(pc)
}
time.Sleep(time.Hour)
consumer.Close()
}
但是上面的代码并不是最佳代码,因为我们最后是通过time.sleep等待goroutine的执行,我们可以更改为通过sync.WaitGroup方式实现
package main
import (
"github.com/Shopify/sarama"
"strings"
"fmt"
"sync"
)
var (
wg sync.WaitGroup
)
func main() {
consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
if err != nil{
fmt.Println("failed to start consumer:",err)
return
}
partitionList,err := consumer.Partitions("nginx_log")
if err != nil {
fmt.Println("Failed to get the list of partitions:",err)
return
}
fmt.Println(partitionList)
for partition := range partitionList{
pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)
return
}
defer pc.AsyncClose()
go func(partitionConsumer sarama.PartitionConsumer){
wg.Add(1)
for msg := range partitionConsumer.Messages(){
fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
}
wg.Done()
}(pc)
}
//time.Sleep(time.Hour)
wg.Wait()
consumer.Close()
}
将客户端需要收集的日志信息放到etcd中
关于etcd处理的代码为: