Skip to main content

Ristretto 高性能 go 语言缓存

·5 mins

dgraph-io/ristretto 是一个高性能, 可限制内存的局部缓存.

注意: 本文基于 ristretto 最初版本 v0.0.1.

特色 #

  • store 分片, 缩小锁粒度
  • batch 操作, sync.Pool + ring buffer 允许数据丢失
  • 允许写失败, buffer channel 满时直接丢弃
  • TinyLFU 准入控制, sampleLFU 淘汰控制
  • 将 key hash 值也存起来, 减少重复计算

细节分析 #

batch 操作, sync.Pool + ring buffer 允许数据丢失 #

https://github.com/zcong1993/ristretto/blob/learn/ring.go

LFU cache 每次操作都要维护 key 的 freq 值, 而且一般来说这个组件和 cache 一样是共享的, 会带来很多锁竞争.

1. 引入 ring buffer 做批操作 (并发不安全) #

// Push appends an item in the ring buffer and drains (copies items and
// sends to Consumer) if full.
func (s *ringStripe) Push(item uint64) {
  s.data = append(s.data, item)
  // if we should drain
  if len(s.data) >= s.capa {
    // Send elements to consumer. Create a new one.
    if s.cons.Push(s.data) {
      s.data = make([]uint64, 0, s.capa)
    } else {
      s.data = s.data[:0]
    }
  }
}

批操作, 当数据量达到 capa 时, 发给 consumer, 这里处理很激进, 如果 consumer 返回 false (阻塞), 会直接丢弃数据.

2. sync.Pool 无锁包裹, 保存临时数据 #

// ringBuffer stores multiple buffers (stripes) and distributes Pushed items
// between them to lower contention.
//
// This implements the "batching" process described in the BP-Wrapper paper
// (section III part A).
type ringBuffer struct {
  pool *sync.Pool
}
// newRingBuffer returns a striped ring buffer. The Consumer in ringConfig will
// be called when individual stripes are full and need to drain their elements.
func newRingBuffer(cons ringConsumer, capa int64) *ringBuffer {
  // LOSSY buffers use a very simple sync.Pool for concurrently reusing
  // stripes. We do lose some stripes due to GC (unheld items in sync.Pool
  // are cleared), but the performance gains generally outweigh the small
  // percentage of elements lost. The performance primarily comes from
  // low-level runtime functions used in the standard library that aren't
  // available to us (such as runtime_procPin()).
  return &ringBuffer{
    pool: &sync.Pool{
      New: func() interface{} { return newRingStripe(cons, capa) },
    },
  }
}
// Push adds an element to one of the internal stripes and possibly drains if
// the stripe becomes full.
func (b *ringBuffer) Push(item uint64) {
  // reuse or create a new stripe
  stripe := b.pool.Get().(*ringStripe)
  stripe.Push(item)
  b.pool.Put(stripe)
}

sync.Pool.Get() 操作可以并发, 如果有可用对象直接返回, 如果没有会调用 New 方法创建新的. sync.Pool 里面的对象是临时对象, 2 个 gc 周期会被回收, 所以这里也会丢数据. RingStripe 实例化对象保存在 pool 中, push 的 item 也会临时保存, 如果 item 数量没达到该 RingStripe 的 capa, 并且过了两个 gc 周期, 会丢失此对象的 items (对象被回收了).

key hash #

string 和 []byte 使用 go 语言用的 memhash, 性能很高

// TODO: Figure out a way to re-use memhash for the second uint64 hash, we
//       already know that appending bytes isn't reliable for generating a
//       second hash (see Ristretto PR #88).
//
//       We also know that while the Go runtime has a runtime memhash128
//       function, it's not possible to use it to generate [2]uint64 or
//       anything resembling a 128bit hash, even though that's exactly what
//       we need in this situation.
func KeyToHash(key interface{}) (uint64, uint64) {
  if key == nil {
    return 0, 0
  }
  switch k := key.(type) {
  case uint64:
    return k, 0
  case string:
    raw := []byte(k)
    return MemHash(raw), xxhash.Sum64(raw)
  case []byte:
    return MemHash(k), xxhash.Sum64(k)
  case byte:
    return uint64(k), 0
  case int:
    return uint64(k), 0
  case int32:
    return uint64(k), 0
  case uint32:
    return uint64(k), 0
  case int64:
    return uint64(k), 0
  default:
    panic("Key type not supported")
  }
}

store 分片, 缩小锁粒度 #

store 实现比较简单, 组合锁和标准 map. 为了减少锁粒度提高性能做了分片.

type lockedMap struct {
  sync.RWMutex
  data map[uint64]storeItem
}

const numShards uint64 = 256

type shardedMap struct {
  shards []*lockedMap
}

func newShardedMap() *shardedMap {
  sm := &shardedMap{
    shards: make([]*lockedMap, int(numShards)),
  }
  for i := range sm.shards {
    sm.shards[i] = newLockedMap()
  }
  return sm
}

func (sm *shardedMap) Get(key, conflict uint64) (interface{}, bool) {
  return sm.shards[key%numShards].Get(key, conflict)
}

func (sm *shardedMap) Set(key, conflict uint64, value interface{}) {
  sm.shards[key%numShards].Set(key, conflict, value)
}

允许写失败, buffer channel 满时直接丢弃 #

func (c *Cache) Set(key, value interface{}, cost int64) bool {
  if c == nil || key == nil {
    return false
  }
  keyHash, conflictHash := c.keyToHash(key)
  i := &item{
    flag:     itemNew,
    key:      keyHash,
    conflict: conflictHash,
    value:    value,
    cost:     cost,
  }
  // attempt to immediately update hashmap value and set flag to update so the
  // cost is eventually updated
  if c.store.Update(keyHash, conflictHash, i.value) {
    i.flag = itemUpdate
  }
  // attempt to send item to policy
  select {
  case c.setBuf <- i:
    return true
  default:
    c.Metrics.add(dropSets, keyHash, 1)
    return false
  }
}

值得注意的是 Set 方法包含 create 和 update, update 操作是不能丢弃和延迟的(不然会读到脏缓存), 所以会先尝试做 update 操作.

这里 channel 操作相当于异步写, 会出现 Set 之后 Get 可能会拿不到缓存, 但是这都是为了尽可能提高 Set 性能.

TinyLFU 准入控制 #

TinyLFU 基于此 论文, 优化了内存使用.

对外提供三个方法:

  • Increment(key uint64) // 增加 freq
  • Estimate(key uint64) int (referred as ɛ) // 获取
  • Reset // 重置

准入控制要保证进入元素的 ɛ 大于淘汰的元素, 否则不允许进入. 简单来说, ristretto 永远会优先缓存 更有价值 的数据.

Increment 会在 Get 调用是通过 sync.Pool + ring buffer batch 更新.

Estimate 会在存储空间达到阈值时, 会随机获取 5 个旧元素, 如果新元素 ɛ 小于所有选取的旧元素, 直接拒绝, 不允许进入缓存. 否则淘汰掉 ɛ 最小的元素, 再随机补充一个旧元素, 重复操作直到空间充足.

// Snippet from the Admission and Eviction Algorithm
incHits := p.admit.Estimate(key)
for ; room < 0; room = p.evict.roomLeft(cost) {
    sample = p.evict.fillSample(sample)
    minKey, minHits, minId := uint64(0), int64(math.MaxInt64), 0
    for i, pair := range sample {
        if hits := p.admit.Estimate(pair.key); hits < minHits {
            minKey, minHits, minId = pair.key, hits, i
        }
    }
    if incHits < minHits {
        p.stats.Add(rejectSets, key, 1)
        return victims, false
    }
    p.evict.del(minKey)
    sample[minId] = sample[len(sample)-1]
    sample = sample[:len(sample)-1]
    victims = append(victims, minKey)
}

还有一点要注意, 如果只用上面的处理方式, 老的元素 ɛ 值会一直增加, 新元素没有机会进入, 为了解决这个问题 在调用 n 次 Increment 方法时会调用 Reset 方法将 TinyLFU 重置.

其他优化点: 为了防止大量只出现一次的 key 使得 TinyLFU 里面数据量过大, 引入布隆过滤器, 只允许 Increment 过滤器中存在的 key, 所以会忽略掉第一次进来的 key. 调用 Estimate 时如果过滤器存在该 key ɛ 值会再额外 +1.

sampleLFU 淘汰控制 #

淘汰控制是在空间不足时执行, 参考上面代码片段.

fillSample 操作使用的是 go 语言 map 随机访问特性.

type sampledLFU struct {
  keyCosts map[uint64]int64
  maxCost  int64
  used     int64
  metrics  *Metrics
}

func (p *sampledLFU) fillSample(in []*policyPair) []*policyPair {
  if len(in) >= lfuSample {
    return in
  }
  for key, cost := range p.keyCosts {
    in = append(in, &policyPair{key, cost})
    if len(in) >= lfuSample {
      return in
    }
  }
  return in
}

总结 #

提升吞吐方面, 从设计上支持数据丢弃, set 操作不确保成功, get 操作更新 freq 也做了批操作, 并且容忍丢失. 提升命中方面 TinyLFU 在准入时只允许 rank 分更高也就是更有价值的元素进入, 淘汰也是优先淘汰 rank 值低的, 永远尝试保留更有价值的数据.

参考资料 #

wxmp