聊聊tempo的ExclusiveQueues

聊聊tempo的ExclusiveQueues序本文主要研究一下tempo的ExclusiveQueuesExclusiveQueuestempo/pkg/flushqueues/exclu

大家好,欢迎来到IT知识分享网。

本文主要研究一下tempo的ExclusiveQueues

聊聊tempo的ExclusiveQueues

ExclusiveQueues

tempo/pkg/flushqueues/exclusivequeues.go

type ExclusiveQueues struct {     queues     []*util.PriorityQueue     index      *atomic.Int32     activeKeys sync.Map } 

ExclusiveQueues定义了queues、index、activeKeys属性

New

tempo/pkg/flushqueues/exclusivequeues.go

// New creates a new set of flush queues with a prom gauge to track current depth func New(queues int, metric prometheus.Gauge) *ExclusiveQueues {     f := &ExclusiveQueues{         queues: make([]*util.PriorityQueue, queues),         index:  atomic.NewInt32(0),     }     for j := 0; j < queues; j++ {         f.queues[j] = util.NewPriorityQueue(metric)     }     return f } 

New方法先创建ExclusiveQueues,然后根据指定的queue个数通过util.NewPriorityQueue(metric)创建PriorityQueue

Enqueue

tempo/pkg/flushqueues/exclusivequeues.go

// Enqueue adds the op to the next queue and prevents any other items to be added with this key func (f *ExclusiveQueues) Enqueue(op util.Op) {     _, ok := f.activeKeys.Load(op.Key())     if ok {         return     }     f.activeKeys.Store(op.Key(), struct{}{})     f.Requeue(op) } 

Enqueue方法先从activeKeys查找指定的key,若已经存在则提前返回,不存在则放入activeKeys中,然后执行f.Requeue(op)

Requeue

tempo/pkg/flushqueues/exclusivequeues.go

// Requeue adds an op that is presumed to already be covered by activeKeys func (f *ExclusiveQueues) Requeue(op util.Op) {     flushQueueIndex := int(f.index.Inc()) % len(f.queues)     f.queues[flushQueueIndex].Enqueue(op) } 

Requeue方法首先通过int(f.index.Inc()) % len(f.queues)计算flushQueueIndex,然后找到对应的queue,执行Enqueue方法

Dequeue

tempo/pkg/flushqueues/exclusivequeues.go

// Dequeue removes the next op from the requested queue.  After dequeueing the calling //  process either needs to call ClearKey or Requeue func (f *ExclusiveQueues) Dequeue(q int) util.Op {     return f.queues[q].Dequeue() } 

Dequeue方法执行f.queues[q]对应queue的Dequeue

Clear

tempo/pkg/flushqueues/exclusivequeues.go

// Clear unblocks the requested op.  This should be called only after a flush has been successful func (f *ExclusiveQueues) Clear(op util.Op) {     f.activeKeys.Delete(op.Key()) } 

Clear方法将指定key从activeKeys中移除

Stop

tempo/pkg/flushqueues/exclusivequeues.go

// Stop closes all queues func (f *ExclusiveQueues) Stop() {     for _, q := range f.queues {         q.Close()     } } 

Stop方法遍历f.queues,挨个执行q.Close()

小结

tempo的ExclusiveQueues定义了queues、index、activeKeys属性;它提供了Enqueue、Requeue、Dequeue、Clear、Stop方法。

doc

  • tempo

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/79914.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信