序
本文主要研究一下promtail的Client
Client
loki/pkg/promtail/client/client.go
// Client pushes entries to Loki and can be stopped
type Client interface {
api.EntryHandler
// Stop goroutine sending batch of entries.
Stop()
}
复制代码
Client接口内嵌了api.EntryHandler接口,定义了Stop方法
EntryHandler
loki/pkg/promtail/api/types.go
// EntryHandler is something that can "handle" entries.
type EntryHandler interface {
Handle(labels model.LabelSet, time time.Time, entry string) error
}
复制代码
EntryHandler接口定义了Handle方法
client
loki/pkg/promtail/client/client.go
// Client for pushing logs in snappy-compressed protos over HTTP.
type client struct {
logger log.Logger
cfg Config
client *http.Client
quit chan struct{}
once sync.Once
entries chan entry
wg sync.WaitGroup
externalLabels model.LabelSet
}
// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error {
if len(c.externalLabels) > 0 {
ls = c.externalLabels.Merge(ls)
}
// Get the tenant ID in case it has been overridden while processing
// the pipeline stages, then remove the special label
tenantID := c.getTenantID(ls)
if _, ok := ls[ReservedLabelTenantID]; ok {
// Clone the label set to not manipulate the input one
ls = ls.Clone()
delete(ls, ReservedLabelTenantID)
}
c.entries <- entry{tenantID, ls, logproto.Entry{
Timestamp: t,
Line: s,
}}
return nil
}
// Stop the client.
func (c *client) Stop() {
c.once.Do(func() { close(c.quit) })
c.wg.Wait()
}
复制代码
client定义了logger、cfg、client、quit、once、entries、wg、externalLabels属性;它实现了Client接口的Handle、Stop方法;Handle方法判断LabelSet是否包含ReservedLabelTenantID,如果包含则会执行ls.Clone()及然后移除,之后构造entry发送到c.entries这个channel;Stop方法执行close(c.quit)