Codis源码分析之Slots迁移篇

Codis源码分析之Slots迁移篇一、Slots迁移的场景&主要面临的问题为什么需要Slots迁移,或者说在什么场景下需要迁移?主要是为了扩容,Codis以Slot为单位

大家好,欢迎来到IT知识分享网。

一、Slots迁移的场景&主要面临的问题

为什么需要Slots迁移,或者说在什么场景下需要迁移?主要是为了扩容,Codis以Slot为单位将整个集群分成了1024个Slots,因此如果在运行过程中想增加服务器,就需要将原有的一些Slots迁移到新的服务器上。

迁移主要的问题:

1、Slot中Key的处理

一个Slot下可能有很多key,因此整个Slot迁移是需要时间的,因此整个Slot在迁移过程中key就有不同的情况,有的正在迁,有的还没迁,有的则已经迁走,针对这些不同状态的key, Codis是如何保证数据的一致性的。

2、大key的处理

像一个list或hash,可能成员成千上万,如何保证迁移的原子性和一致性。

3、如何保障系统的可用性

因为迁移是耗时的,是用同步还是异步,如何在系统可用性和数据一致性做权衡?

二、迁移代码分析

1、入口

迁移一般在Fe界面上由管理员发起,一般来说是迁移一个范围:

Codis源码分析之Slots迁移篇

后端入口为apiServer::SlotCreateActionRange,这个函数只做一些基本的参数验证,实际调用Topom的SlotCreateActionRange:

func (s *Topom) SlotCreateActionRange(beg, end int, gid int, must bool) error {   //省略一些代码 var pending []int for sid := beg; sid <= end; sid++ { m, err := ctx.getSlotMapping(sid) if err != nil { return err } if m.Action.State != models.ActionNothing { if !must { continue } return errors.Errorf("slot-[%d] action already exists", sid) } if m.GroupId == g.Id { if !must { continue } return errors.Errorf("slot-[%d] already in group-[%d]", sid, g.Id) } pending = append(pending, m.Id) } for _, sid := range pending { m, err := ctx.getSlotMapping(sid) if err != nil { return err } defer s.dirtySlotsCache(m.Id)     //更改状态 m.Action.State = models.ActionPending m.Action.Index = ctx.maxSlotActionIndex() + 1 m.Action.TargetId = g.Id //更新Zookeeper的状态 if err := s.storeUpdateSlotMapping(m); err != nil { return err } } return nil }

先会检查一些状态,如该Slot是否正在迁移,目标Group和当前Group是否一致,后面重点逻辑是将状态改为ActionPending,然后保存到Zk中就返回给用户了。

到上面肯定还没迁完,应该是有后台程序扫描这个状态然后进行迁移,这个入口为Topom::ProcessSlotAction,这个协程随着Dashboard启动的时候启动:

go func() { for !s.IsClosed() { if s.IsOnline() { if err := s.ProcessSlotAction(); err != nil { log.WarnErrorf(err, "process slot action failed") time.Sleep(time.Second * 5) } } time.Sleep(time.Second) } }()

具体代码如下:

func (s *Topom) ProcessSlotAction() error { for s.IsOnline() { var ( marks = make(map[int]bool) plans = make(map[int]bool) ) var accept = func(m *models.SlotMapping) bool { if marks[m.GroupId] || marks[m.Action.TargetId] { return false } if plans[m.Id] { return false } return true } var update = func(m *models.SlotMapping) bool { if m.GroupId != 0 { marks[m.GroupId] = true } marks[m.Action.TargetId] = true plans[m.Id] = true return true } var parallel = math2.MaxInt(1, s.config.MigrationParallelSlots) for parallel > len(plans) {     //状态转移在这里完成 _, ok, err := s.SlotActionPrepareFilter(accept, update) if err != nil { return err } else if !ok { break } } if len(plans) == 0 { return nil } var fut sync2.Future for sid, _ := range plans { fut.Add() go func(sid int) { log.Warnf("slot-[%d] process action", sid)         //重点,真正的数据迁移 var err = s.processSlotAction(sid) if err != nil { status := fmt.Sprintf("[ERROR] Slot[%04d]: %s", sid, err) s.action.progress.status.Store(status) } else { s.action.progress.status.Store("") } fut.Done(strconv.Itoa(sid), err) }(sid) } for _, v := range fut.Wait() { if v != nil { return v.(error) } } time.Sleep(time.Millisecond * 10) } return nil }
func (s *Topom) SlotActionPrepareFilter(accept, update func(m *models.SlotMapping) bool) (int, bool, error) { //省略一些代码 switch m.Action.State {   case models.ActionPending: m.Action.State = models.ActionPreparing if err := s.storeUpdateSlotMapping(m); err != nil { return 0, false, err }     fallthrough   case models.ActionPreparing: m.Action.State = models.ActionPrepared     if err := s.resyncSlotMappings(ctx, m); err != nil { return 0, false, err } if err := s.storeUpdateSlotMapping(m); err != nil { return 0, false, err }     fallthrough   case models.ActionPrepared: m.Action.State = models.ActionMigrating if err := s.resyncSlotMappings(ctx, m); err != nil { log.Warnf("slot-[%d] resync to migrating failed", m.Id) return 0, false, err } if err := s.storeUpdateSlotMapping(m); err != nil { return 0, false, err }     fallthrough   case models.ActionMigrating:     return m.Id, true, nil   case models.ActionFinished:     return m.Id, true, nil   default:     return 0, false, errors.Errorf("slot-[%d] action state is invalid", m.Id) } }

可以看到整个的状态变换过程如下:

ActionPending =》ActionPreparing =》ActionPrepared

=> ActionMigrating => ActionFinished

在ActionMigrating之前变更都只是更新Zk中的状态,ActionPreparing和ActionPrepared还会调用resyncSlotMappings通过Proxy重连新的Redis Server并且设置slot从哪迁移等信息:

case models.ActionPrepared: fallthrough case models.ActionMigrating: slot.BackendAddr = ctx.getGroupMaster(m.Action.TargetId) slot.BackendAddrGroupId = m.Action.TargetId slot.MigrateFrom = ctx.getGroupMaster(m.GroupId) slot.MigrateFromGroupId = m.GroupId

然后看实际的数据迁移是怎么发生的,回到ProcessSlotAction方法

var err = s.processSlotAction(sid)
func (s *Topom) processSlotAction(sid int) error { var db int = 0 for s.IsOnline() { if exec, err := s.newSlotActionExecutor(sid); err != nil { return err } else if exec == nil { time.Sleep(time.Second) } else { n, nextdb, err := exec(db) if err != nil { return err } log.Debugf("slot-[%d] action executor %d", sid, n)       //迁移完成判断 if n == 0 && nextdb == -1 { return s.SlotActionComplete(sid) } status := fmt.Sprintf("[OK] Slot[%04d]@DB[%d]=%d", sid, db, n) s.action.progress.status.Store(status) if us := s.GetSlotActionInterval(); us != 0 { time.Sleep(time.Microsecond * time.Duration(us)) } db = nextdb } } return nil }

通过newSlotActionExecutor得到执行器,

switch method { case models.ForwardSync: do = func() (int, error) { return c.MigrateSlot(sid, dest) } case models.ForwardSemiAsync: var option = &redis.MigrateSlotAsyncOption{ MaxBulks: s.config.MigrationAsyncMaxBulks, MaxBytes: s.config.MigrationAsyncMaxBytes.AsInt(), NumKeys: s.config.MigrationAsyncNumKeys, Timeout: math2.MinDuration(time.Second*5, s.config.MigrationTimeout.Duration()), } do = func() (int, error) { return c.MigrateSlotAsync(sid, dest, option) }

可以看到迁移分同步和异步,看同步:

func (c *Client) MigrateSlot(slot int, target string) (int, error) { host, port, err := net.SplitHostPort(target) if err != nil { return 0, errors.Trace(err) } mseconds := int(c.Timeout / time.Millisecond) if reply, err := c.Do("SLOTSMGRTTAGSLOT", host, port, mseconds, slot); err != nil { return 0, errors.Trace(err) } else { p, err := redigo.Ints(redigo.Values(reply, nil)) if err != nil || len(p) != 2 { return 0, errors.Errorf("invalid response = %v", reply) } return p[1], nil } }

可以看到如果是同步迁移会调用SLOTSMGRTTAGSLOT命令进行迁移,这是一个Codis对Redis改造的命令,会随机迁移Slot下一个Key,所以在上面有判断是否迁移完成的:

func (s *Topom) processSlotAction(sid int) error { var db int = 0 for s.IsOnline() { if exec, err := s.newSlotActionExecutor(sid); err != nil {       return err } else {       n, nextdb, err := exec(db) //迁移完成判断 if n == 0 && nextdb == -1 { return s.SlotActionComplete(sid) }    } } return nil }

即命令返回2个参数(第3个异常忽略),第1个表示迁移的数量,第2个表示下一个要迁移的数据库,如果前者为0后者为-1则表示迁移完成。迁移完成后调用SlotActionComplete标记迁移完成

case models.ActionFinished: log.Warnf("slot-[%d] resync to finished", m.Id) if err := s.resyncSlotMappings(ctx, m); err != nil { log.Warnf("slot-[%d] resync to finished failed", m.Id) return err } defer s.dirtySlotsCache(m.Id) m = &models.SlotMapping{ Id: m.Id, GroupId: m.Action.TargetId, } return s.storeUpdateSlotMapping(m)

2、迁移过程中Slot的key的读写处理

前面分析Proxy代码的时候讲过,一个请求会进入到Session的handleRequest:

func (s *Session) handleRequest(r *Request, d *Router) error { opstr, flag, err := getOpInfo(r.Multi) if err != nil { return err } r.OpStr = opstr r.OpFlag = flag r.Broken = &s.broken if flag.IsNotAllowed() { return fmt.Errorf("command '%s' is not allowed", opstr) } switch opstr { case "QUIT": return s.handleQuit(r) case "AUTH": return s.handleAuth(r) } if !s.authorized { if s.config.SessionAuth != "" { r.Resp = redis.NewErrorf("NOAUTH Authentication required") return nil } s.authorized = true } switch opstr { case "SELECT": return s.handleSelect(r) case "PING": return s.handleRequestPing(r, d) case "INFO": return s.handleRequestInfo(r, d) case "MGET": return s.handleRequestMGet(r, d) case "MSET": return s.handleRequestMSet(r, d) case "DEL": return s.handleRequestDel(r, d) case "EXISTS": return s.handleRequestExists(r, d) case "SLOTSINFO": return s.handleRequestSlotsInfo(r, d) case "SLOTSSCAN": return s.handleRequestSlotsScan(r, d) case "SLOTSMAPPING": return s.handleRequestSlotsMapping(r, d) default: return d.dispatch(r) } }

默认会走到d.dispatch,如果是同步的话会走下面的逻辑:

func (d *forwardSync) process(s *Slot, r *Request, hkey []byte) (*BackendConn, error) { if s.migrate.bc != nil && len(hkey) != 0 { if err := d.slotsmgrt(s, hkey, r.Database, r.Seed16()); err != nil { log.Debugf("slot-%04d migrate from = %s to %s failed: hash key = '%s', database = %d, error = %s", s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, r.Database, err) return nil, err } } r.Group = &s.refs r.Group.Add(1) return d.forward2(s, r), nil }

如果slot正在迁移会调用slotsmgrt处理,

unc (d *forwardHelper) slotsmgrt(s *Slot, hkey []byte, database int32, seed uint) error { m := &Request{} m.Multi = []*redis.Resp{ redis.NewBulkBytes([]byte("SLOTSMGRTTAGONE")), redis.NewBulkBytes(s.backend.bc.host), redis.NewBulkBytes(s.backend.bc.port), redis.NewBulkBytes([]byte("3000")), redis.NewBulkBytes(hkey), }

可以看到,如果当前处理的key所属的slot正在迁移,则调用SLOTSMGRTTAGONE命令将这个key迁移完成再返回给客户端,即必须要迁移这个key完成才返回给客户端。

三、总结

1、Slots迁移由管理员在Fe手动发起,发起后Codis只是将Slot状态变成

ActionPending;

2、Codis后台线程会扫描上述状态的Slots,依次进行以下状态的转换:

ActionPending => ActionPreparing => ActionPrepared =>

ActionMigrating;

3、ActionMigrating状态的Slots由Codis向Redis Server发送SLOTSMGRTTAGSLOT命令随机迁移一个key,这个过程会一直持续,直到Slot下所有Key迁移完成;

4、迁移过程中的Slot下的操作如果是同步则会先等待key迁移操作完成才往下操作,只要下层Redis Server执行是原子的,则可以保证整个过程的原子性。

可以看到,整个过程还是比较复杂的,特别是一些核心逻辑在Redis Server了,在Redis Server层如何保证操作的原子性和一致性,这个和异步迁移后面另外再讲述。

Codis初始化之Group及Slot篇

Codis源码分析之环境篇

Codis Proxy初始化篇

Codis Proxy是如何处理一个请求的

故障演练利器之ChaosBlade介绍

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/83434.html

(0)

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

关注微信