序
本文主要研究一下cortex的kv.Client
kv.Client
github.com/cortexproject/cortex/pkg/ring/kv/client.go
// Client is a high-level client for key-value stores (such as Etcd and
// Consul) that exposes operations such as CAS and Watch which take callbacks.
// It also deals with serialisation by using a Codec and having a instance of
// the the desired type passed in to methods ala json.Unmarshal.
type Client interface {
// List returns a list of keys under the given prefix. Returned keys will
// include the prefix.
List(ctx context.Context, prefix string) ([]string, error)
// Get a specific key. Will use a codec to deserialise key to appropriate type.
// If the key does not exist, Get will return nil and no error.
Get(ctx context.Context, key string) (interface{}, error)
// Delete a specific key. Deletions are best-effort and no error will
// be returned if the key does not exist.
Delete(ctx context.Context, key string) error
// CAS stands for Compare-And-Swap. Will call provided callback f with the
// current value of the key and allow callback to return a different value.
// Will then attempt to atomically swap the current value for the new value.
// If that doesn't succeed will try again - callback will be called again
// with new value etc. Guarantees that only a single concurrent CAS
// succeeds. Callback can return nil to indicate it is happy with existing
// value.
CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error
// WatchKey calls f whenever the value stored under key changes.
WatchKey(ctx context.Context, key string, f func(interface{}) bool)
// WatchPrefix calls f whenever any value stored under prefix changes.
WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool)
}
复制代码
kv.Client接口定义了List、Get、Delete、CAS、WatchKey、WatchPrefix方法
Client
github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go
// Client implements kv.Client interface, by using memberlist.KV
type Client struct {
kv *KV // reference to singleton memberlist-based KV
codec codec.Codec
}
// List is part of kv.Client interface.
func (c *Client) List(ctx context.Context, prefix string) ([]string, error) {
err := c.awaitKVRunningOrStopping(ctx)
if err != nil {
return nil, err
}
return c.kv.List(prefix), nil
}
// Get is part of kv.Client interface.
func (c *Client) Get(ctx context.Context, key string) (interface{}, error) {
err := c.awaitKVRunningOrStopping(ctx)
if err != nil {
return nil, err
}
return c.kv.Get(key, c.codec)
}
// Delete is part of kv.Client interface.
func (c *Client) Delete(ctx context.Context, key string) error {
return errors.New("memberlist does not support Delete")
}
// CAS is part of kv.Client interface
func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {
err := c.awaitKVRunningOrStopping(ctx)
if err != nil {
return err
}
return c.kv.CAS(ctx, key, c.codec, f)
}
// WatchKey is part of kv.Client interface.
func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool) {
err := c.awaitKVRunningOrStopping(ctx)
if err != nil {
return
}
c.kv.WatchKey(ctx, key, c.codec, f)
}
// WatchPrefix calls f whenever any value stored under prefix changes.
// Part of kv.Client interface.
func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) {
err := c.awaitKVRunningOrStopping(ctx)
if err != nil {
return
}
c.kv.WatchPrefix(ctx, prefix, c.codec, f)
}
复制代码