序
本文主要研究一下cortex的Ingester
Ingester
cortex/pkg/api/api.go
// Ingester is defined as an interface to allow for alternative implementations
// of ingesters to be passed into the API.RegisterIngester() method.
type Ingester interface {
client.IngesterServer
FlushHandler(http.ResponseWriter, *http.Request)
ShutdownHandler(http.ResponseWriter, *http.Request)
Push(context.Context, *client.WriteRequest) (*client.WriteResponse, error)
}
复制代码
Ingester接口内嵌了client.IngesterServer,定义了FlushHandler、ShutdownHandler、Push方法
client.IngesterServer
cortex/pkg/ingester/client/cortex.pb.go
// IngesterServer is the server API for Ingester service.
type IngesterServer interface {
Push(context.Context, *WriteRequest) (*WriteResponse, error)
Query(context.Context, *QueryRequest) (*QueryResponse, error)
QueryStream(*QueryRequest, Ingester_QueryStreamServer) error
LabelValues(context.Context, *LabelValuesRequest) (*LabelValuesResponse, error)
LabelNames(context.Context, *LabelNamesRequest) (*LabelNamesResponse, error)
UserStats(context.Context, *UserStatsRequest) (*UserStatsResponse, error)
AllUserStats(context.Context, *UserStatsRequest) (*UsersStatsResponse, error)
MetricsForLabelMatchers(context.Context, *MetricsForLabelMatchersRequest) (*MetricsForLabelMatchersResponse, error)
MetricsMetadata(context.Context, *MetricsMetadataRequest) (*MetricsMetadataResponse, error)
// TransferChunks allows leaving ingester (client) to stream chunks directly to joining ingesters (server).
TransferChunks(Ingester_TransferChunksServer) error
}
复制代码
client.IngesterServer接口定义了Push、Query、QueryStream、LabelValues、LabelNames、UserStats、AllUserStats、MetricsForLabelMatchers、MetricsMetadata、TransferChunks方法
Push
cortex/pkg/ingester/ingester.go
// Push implements client.IngesterServer
func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}
if i.cfg.BlocksStorageEnabled {
return i.v2Push(ctx, req)
}
// NOTE: because we use `unsafe` in deserialisation, we must not
// retain anything from `req` past the call to ReuseSlice
defer client.ReuseSlice(req.Timeseries)
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, fmt.Errorf("no user id")
}
// Given metadata is a best-effort approach, and we don't halt on errors
// process it before samples. Otherwise, we risk returning an error before ingestion.
i.pushMetadata(ctx, userID, req.GetMetadata())
var firstPartialErr *validationError
var record *WALRecord
if i.cfg.WALConfig.WALEnabled {
record = recordPool.Get().(*WALRecord)
record.UserID = userID
// Assuming there is not much churn in most cases, there is no use
// keeping the record.Labels slice hanging around.
record.Series = nil
if cap(record.Samples) < len(req.Timeseries) {
record.Samples = make([]tsdb_record.RefSample, 0, len(req.Timeseries))
} else {
record.Samples = record.Samples[:0]
}
}
for _, ts := range req.Timeseries {
seriesSamplesIngested := 0
for _, s := range ts.Samples {
// append() copies the memory in `ts.Labels` except on the error path
err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source, record)
if err == nil {
seriesSamplesIngested++
continue
}
i.metrics.ingestedSamplesFail.Inc()
if ve, ok := err.(*validationError); ok {
if firstPartialErr == nil {
firstPartialErr = ve
}
continue
}
// non-validation error: abandon this request
return nil, grpcForwardableError(userID, http.StatusInternalServerError, err)
}
if i.cfg.ActiveSeriesMetricsEnabled && seriesSamplesIngested > 0 {
// updateActiveSeries will copy labels if necessary.
i.updateActiveSeries(userID, time.Now(), ts.Labels)
}
}
if record != nil {
// Log the record only if there was no error in ingestion.
if err := i.wal.Log(record); err != nil {
return nil, err
}
recordPool.Put(record)
}
if firstPartialErr != nil {
// grpcForwardableError turns the error into a string so it no longer references `req`
return &client.WriteResponse{}, grpcForwardableError(userID, firstPartialErr.code, firstPartialErr)
}
return &client.WriteResponse{}, nil
}
复制代码