序
本文主要研究一下cortex的ReadRing
ReadRing
cortex/pkg/ring/ring.go
// ReadRing represents the read interface to the ring.
type ReadRing interface {
prometheus.Collector
// Get returns n (or more) ingesters which form the replicas for the given key.
// bufDescs, bufHosts and bufZones are slices to be overwritten for the return value
// to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet().
Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, bufZones []string) (ReplicationSet, error)
// GetAllHealthy returns all healthy instances in the ring, for the given operation.
// This function doesn't check if the quorum is honored, so doesn't fail if the number
// of unhealthy ingesters is greater than the tolerated max unavailable.
GetAllHealthy(op Operation) (ReplicationSet, error)
// GetReplicationSetForOperation returns all instances where the input operation should be executed.
// The resulting ReplicationSet doesn't necessarily contains all healthy instances
// in the ring, but could contain the minimum set of instances required to execute
// the input operation.
GetReplicationSetForOperation(op Operation) (ReplicationSet, error)
ReplicationFactor() int
IngesterCount() int
// ShuffleShard returns a subring for the provided identifier (eg. a tenant ID)
// and size (number of instances).
ShuffleShard(identifier string, size int) ReadRing
// ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes
// all instances that have been part of the identifier's shard since "now - lookbackPeriod".
ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing
// HasInstance returns whether the ring contains an instance matching the provided instanceID.
HasInstance(instanceID string) bool
}
复制代码
ReadRing内嵌了prometheus.Collector,定义了Get、GetAllHealthy、GetReplicationSetForOperation、ReplicationFactor、IngesterCount、ShuffleShard、ShuffleShardWithLookback、HasInstance方法
Get
cortex/pkg/ring/ring.go
// Get returns n (or more) ingesters which form the replicas for the given key.
func (r *Ring) Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, bufZones []string) (ReplicationSet, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.ringDesc == nil || len(r.ringTokens) == 0 {
return ReplicationSet{}, ErrEmptyRing
}
var (
n = r.cfg.ReplicationFactor
ingesters = bufDescs[:0]
start = searchToken(r.ringTokens, key)
iterations = 0
// We use a slice instead of a map because it's faster to search within a
// slice than lookup a map for a very low number of items.
distinctHosts = bufHosts[:0]
distinctZones = bufZones[:0]
)
for i := start; len(distinctHosts) < n && iterations < len(r.ringTokens); i++ {
iterations++
// Wrap i around in the ring.
i %= len(r.ringTokens)
token := r.ringTokens[i]
info, ok := r.ringInstanceByToken[token]
if !ok {
// This should never happen unless a bug in the ring code.
return ReplicationSet{}, ErrInconsistentTokensInfo
}
// We want n *distinct* ingesters && distinct zones.
if util.StringsContain(distinctHosts, info.InstanceID) {
continue
}
// Ignore if the ingesters don't have a zone set.
if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {
if util.StringsContain(distinctZones, info.Zone) {
continue
}
distinctZones = append(distinctZones, info.Zone)
}
distinctHosts = append(distinctHosts, info.InstanceID)
ingester := r.ringDesc.Ingesters[info.InstanceID]
// Check whether the replica set should be extended given we're including
// this instance.
if op.ShouldExtendReplicaSetOnState(ingester.State) {
n++
}
ingesters = append(ingesters, ingester)
}
liveIngesters, maxFailure, err := r.strategy.Filter(ingesters, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled)
if err != nil {
return ReplicationSet{}, err
}
return ReplicationSet{
Ingesters: liveIngesters,
MaxErrors: maxFailure,
}, nil
}
复制代码