大家好,欢迎来到IT知识分享网。
文章目录
Watch/Notify机制
1. OverView
注:本内容是基于hammer版本进行讲解,其他版本大体流程相似
- ceph提供的Watch/Notify机制实现了不同rados client之间的通信,使得不同客户端的数据保持一致
- 针对每个rados对象实现的,librados提供了watch、notify等接口。用户可以自定义一些watch,比如tgt中注册watch有:
- librbd默认在open image的时候会对rbd_header进行watch注册用来进行rbd image的元数据更新操作
- 类似于信息发布-订阅模式:通过watch订阅消息,通过notify发布消息
Watch流程:
Notify流程:
2. Watch/Notify代码实现过程详解
2.1 librados层watch
int librados::IoCtxImpl::watch(const object_t& oid,
uint64_t *handle,
librados::WatchCtx *ctx,
librados::WatchCtx2 *ctx2)
{
::ObjectOperation wr; //watch是个写操作,因为watch信息需要持久化以便OSD重启完成之后需要知道rados客户端注册了watch
version_t objver;
C_SaferCond onfinish;
Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0); //调用objecter创建一个new LingerOp,并将LingerOp添加到objecter的linger_ops map以及linger_ops set中
*handle = linger_op->get_cookie(); //将LingerOp的地址转换为uint64_t
linger_op->watch_context = new WatchInfo(this,
oid, ctx, ctx2); //创建一个watch context用于objecter watch消息回调处理
prepare_assert_ops(&wr);
wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH); //wr 添加一个CEPH_OSD_WATCH_OP_WATCH op用于持久化watcher信息,并把LingerOp的地址cookie作为参数发送给OSD用于标示每一个客户端
bufferlist bl;
objecter->linger_watch(linger_op, wr,
snapc, ceph::real_clock::now(), bl,
&onfinish,
&objver); //调用objecter linger_watch将op发送到OSD端
int r = onfinish.wait(); //等待op返回
set_sync_op_version(objver);
if (r < 0) {
objecter->linger_cancel(linger_op);
*handle = 0;
}
return r;
}
2.2 osdc Linger Op发送
从上图可以看出linger op最终会调用op_submit将op通过message层发送到osd
ceph_tid_t Objecter::linger_watch(LingerOp *info,
ObjectOperation& op,
const SnapContext& snapc,
real_time mtime,
bufferlist& inbl,
Context *oncommit,
version_t *objver)
{
info->is_watch = true; //Objecter::handle_watch_notify处理notify消息时根据该值来判断该客户端是消息的订阅者还是发布者
info->snapc = snapc;
info->mtime = mtime;
info->target.flags |= CEPH_OSD_FLAG_WRITE;
info->ops = op.ops;
info->inbl = inbl;
info->poutbl = NULL;
info->pobjver = objver;
info->on_reg_commit = oncommit; //Objecter::handle_osd_op_reply(MOSDOpReply *m)调用oncommit_sync最终会调用到这里
shunique_lock sul(rwlock, ceph::acquire_unique);
_linger_submit(info, sul);
logger->inc(l_osdc_linger_active);
return info->linger_id;
}
void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
{
assert(sul.owns_lock() && sul.mutex() == &rwlock);
assert(info->linger_id);
// Populate Op::target
OSDSession *s = NULL;
_calc_target(&info->target, &info->last_force_resend);
// Create LingerOp<->OSDSession relation
int r = _get_session(info->target.osd, &s, sul);
assert(r == 0);
OSDSession::unique_lock sl(s->lock);
_session_linger_op_assign(s, info); //将Op::target所在的osd session与LingerOp建立映射关系
// op->session = to;
// to->linger_ops[op->linger_id] = op;
sl.unlock();
put_session(s);
_send_linger(info, sul);
}
void Objecter::_send_linger(LingerOp *info,
shunique_lock& sul)
{
assert(sul.owns_lock() && sul.mutex() == &rwlock);
vector<OSDOp> opv;
Context *oncommit = NULL;
LingerOp::shared_lock watchl(info->watch_lock);
bufferlist *poutbl = NULL;
if (info->registered && info->is_watch) {
//初次watch,registered==false
ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
<< dendl;
opv.push_back(OSDOp());
opv.back().op.op = CEPH_OSD_OP_WATCH;
opv.back().op.watch.cookie = info->get_cookie();
opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
opv.back().op.watch.gen = ++info->register_gen;
oncommit = new C_Linger_Reconnect(this, info);
} else {
//初次watch会进入这里
ldout(cct, 15) << "send_linger " << info->linger_id << " register"
<< dendl;
opv = info->ops;
//创建LingerOp回调处理函数
C_Linger_Commit *c = new C_Linger_Commit(this, info);
if (!info->is_watch) {
info->notify_id = 0;
poutbl = &c->outbl;
}
oncommit = c;
}
watchl.unlock();
Op *o = new Op(info->target.base_oid, info->target.base_oloc,
opv, info->target.flags | CEPH_OSD_FLAG_READ,
NULL, NULL,
info->pobjver);
o->oncommit_sync = oncommit; //Objecter::handle_osd_op_reply会调用
o->outbl = poutbl;
o->snapid = info->snap;
o->snapc = info->snapc;
o->mtime = info->mtime;
o->target = info->target;
o->tid = last_tid.inc();
// do not resend this; we will send a new op to reregister
o->should_resend = false;
if (info->register_tid) {
// repeat send. cancel old registeration op, if any.
OSDSession::unique_lock sl(info->session->lock);
if (info->session->ops.count(info->register_tid)) {
Op *o = info->session->ops[info->register_tid];
_op_cancel_map_check(o);
_cancel_linger_op(o);
}
sl.unlock();
_op_submit(o, sul, &info->register_tid); //调用_op_submit发送op
} else {
// first send
_op_submit_with_budget(o, sul, &info->register_tid);
}
logger->inc(l_osdc_linger_send);
}
2.3 osd处理LingerOp
int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) {
...
case CEPH_OSD_OP_WATCH:
++ctx->num_write;
{
tracepoint(osd, do_osd_op_pre_watch, soid.oid.name.c_str(), soid.snap.val,
op.watch.cookie, op.watch.op);
if (!obs.exists) {
//从这里可以看出调用watch接口之前对象必须先创建否则注册失败
result = -ENOENT;
break;
}
uint64_t cookie = op.watch.cookie;
entity_name_t entity = ctx->reqid.name;
ObjectContextRef obc = ctx->obc;
dout(10) << "watch " << ceph_osd_watch_op_name(op.watch.op)
<< ": ctx->obc=" << (void *)obc.get() << " cookie=" << cookie
<< " oi.version=" << oi.version.version << " ctx->at_version=" << ctx->at_version << dendl;
dout(10) << "watch: oi.user_version=" << oi.user_version<< dendl;
dout(10) << "watch: peer_addr="
<< ctx->op->get_req()->get_connection()->get_peer_addr() << dendl;
watch_info_t w(cookie, cct->_conf->osd_client_watch_timeout,
ctx->op->get_req()->get_connection()->get_peer_addr()); //添加watch info信息,包括LingerOp的cookie地址、watch超时处理时间(osd_client_watch_timeout=30)以及LingerOp所在客户端的地址
if (op.watch.op == CEPH_OSD_WATCH_OP_WATCH ||
op.watch.op == CEPH_OSD_WATCH_OP_LEGACY_WATCH) {
//检查对象oi的watchers是否已经注册了该watcher,如果没有则添加到oi的watchers列表中并持久化
if (oi.watchers.count(make_pair(cookie, entity))) {
dout(10) << " found existing watch " << w << " by " << entity << dendl;
} else {
dout(10) << " registered new watch " << w << " by " << entity << dendl;
oi.watchers[make_pair(cookie, entity)] = w;
t->nop(); // make sure update the object_info on disk!
}
bool will_ping = (op.watch.op == CEPH_OSD_WATCH_OP_WATCH);
ctx->watch_connects.push_back(make_pair(w, will_ping));
} else if (op.watch.op == CEPH_OSD_WATCH_OP_RECONNECT) {
if (!oi.watchers.count(make_pair(cookie, entity))) {
result = -ENOTCONN;
break;
}
dout(10) << " found existing watch " << w << " by " << entity << dendl;
ctx->watch_connects.push_back(make_pair(w, true));
} else if (op.watch.op == CEPH_OSD_WATCH_OP_PING) {
if (!oi.watchers.count(make_pair(cookie, entity))) {
result = -ENOTCONN;
break;
}
map<pair<uint64_t,entity_name_t>,WatchRef>::iterator p =
obc->watchers.find(make_pair(cookie, entity));
if (p == obc->watchers.end() ||
!p->second->is_connected()) {
// client needs to reconnect
result = -ETIMEDOUT;
break;
}
dout(10) << " found existing watch " << w << " by " << entity << dendl;
p->second->got_ping(ceph_clock_now(NULL));
result = 0;
} else if (op.watch.op == CEPH_OSD_WATCH_OP_UNWATCH) {
map<pair<uint64_t, entity_name_t>, watch_info_t>::iterator oi_iter =
oi.watchers.find(make_pair(cookie, entity));
if (oi_iter != oi.watchers.end()) {
dout(10) << " removed watch " << oi_iter->second << " by "
<< entity << dendl;
oi.watchers.erase(oi_iter);
t->nop(); // update oi on disk
ctx->watch_disconnects.push_back(
watch_disconnect_t(cookie, entity, false));
} else {
dout(10) << " can't remove: no watch by " << entity << dendl;
}
}
}
break;
...
}
此处抛出一个问题:对象的oi中既然持久化了watchers,那么如果客户端调用完watch之后就重启了,那是不是watchers就没法清除了呢?显然不应该,那么针对该情况是怎么处理的呢?
从上图时序图可以看出,当oi主副本收到所有从副本的写完成响应后调用do_osd_op_effects,该函数会遍历watch_connects链表对每个watch进行connect操作,如下
void ReplicatedPG::do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn)
{
entity_name_t entity = ctx->reqid.name;
dout(15) << "do_osd_op_effects " << entity << " con " << conn.get() << dendl;
// disconnects first
complete_disconnect_watches(ctx->obc, ctx->watch_disconnects);
assert(conn);
boost::intrusive_ptr<OSD::Session> session((OSD::Session *)conn->get_priv());
if (!session.get())
return;
session->put(); // get_priv() takes a ref, and so does the intrusive_ptr
for (list<pair<watch_info_t,bool> >::iterator i = ctx->watch_connects.begin();
i != ctx->watch_connects.end();
++i) {
pair<uint64_t, entity_name_t> watcher(i->first.cookie, entity);
dout(15) << "do_osd_op_effects applying watch connect on session "
<< session.get() << " watcher " << watcher << dendl;
WatchRef watch;
if (ctx->obc->watchers.count(watcher)) {
dout(15) << "do_osd_op_effects found existing watch watcher " << watcher
<< dendl;
watch = ctx->obc->watchers[watcher];
} else {
dout(15) << "do_osd_op_effects new watcher " << watcher
<< dendl;
watch = Watch::makeWatchRef(
this, osd, ctx->obc, i->first.timeout_seconds,
i->first.cookie, entity, conn->get_peer_addr());
ctx->obc->watchers.insert(
make_pair(
watcher,
watch));
}
watch->connect(conn, i->second);
}
...
}
void Watch::connect(ConnectionRef con, bool _will_ping)
{
if (conn == con) {
dout(10) << __func__ << " con " << con << " - already connected" << dendl;
return;
}
dout(10) << __func__ << " con " << con << dendl;
conn = con;
will_ping = _will_ping;
...
if (will_ping) {
last_ping = ceph_clock_now(NULL);
register_cb(); //注册超时定时器
} else {
unregister_cb();
}
}
2.4 watch LingerOp reply
客户端对于watch的op响应调用的是handle_osd_op_reply接口,该接口会调用op注册的回调处理函数,这里对应的就是watch注册的 info->on_reg_commit = C_SaferCond onfinish;
更新info->registered = true;
info->pobjver = NULL;
2.5 watch ping
- OSD接收到notify的消息后会发送给所有的watcher客户端消息订阅者;watcher列表是通过心跳机制维护的。例如某些客户端挂掉了,则需要把该watcher从列表中删除,因为这些客户端可能已经挂掉了无需发送notify消息
- 客户端为了能够正常接收到notify消息,需要定时向osd发送心跳以表示自己还活着
- watch ping是在Objecter::tick定时器线程中执行的
- 每隔objecter_tick_interval=5s发送一次心跳
- objecter tick线程轮训检查所有的OSD Session中记录的LingerOp调用_send_linger_ping发送心跳消息
- osd接收到ping消息更新超时定时器
- watch ping的op也是有reply的。_send_linger_ping函数中注册了C_Linger_Ping *onack,该回调主要是为了更新 info->watch_valid_thru = sent字段librados 提供了watch_check用于检查watch ping是否超时未响应以及处理watch error消息
2.6 Notify
int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
uint64_t timeout_ms,
bufferlist *preply_bl,
char **preply_buf, size_t *preply_buf_len)
{
Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0); // info->is_watch = false区分是notify消息发送者还是消息订阅者
C_SaferCond notify_finish_cond;
Context *notify_finish = new C_notify_Finish(client->cct, ¬ify_finish_cond,
objecter, linger_op, preply_bl,
preply_buf, preply_buf_len);
(void) notify_finish;
uint32_t timeout = notify_timeout; //notify消息超时时间,osd会利用该参数设置notify消息通知超时处理定时器
if (timeout_ms)
timeout = timeout_ms / 1000;
// Construct RADOS op
::ObjectOperation rd; //是个读OP,因为不涉及到任何持久化的东西
prepare_assert_ops(&rd);
bufferlist inbl;
rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl); //封装notify消息CEPH_OSD_OP_NOTIFY
// Issue RADOS op
C_SaferCond onack;
version_t objver;
//发送notify消息
objecter->linger_notify(linger_op,
rd, snap_seq, inbl, NULL,
&onack, &objver);
ldout(client->cct, 10) << __func__ << " issued linger op " << linger_op << dendl;
//handle_osd_op_reply会唤醒该条件变量
int r = onack.wait();
ldout(client->cct, 10) << __func__ << " linger op " << linger_op
<< " acked (" << r << ")" << dendl;
if (r == 0) {
ldout(client->cct, 10) << __func__ << " waiting for watch_notify finish "
<< linger_op << dendl;
//handle_watch_notify会唤醒该条件变量
r = notify_finish_cond.wait();
} else {
ldout(client->cct, 10) << __func__ << " failed to initiate notify, r = "
<< r << dendl;
notify_finish_cond.wait();
}
//notify注册的lingerop不同于watch的,用完之后就要删除
objecter->linger_cancel(linger_op);
set_sync_op_version(objver);
return r;
}
2.7 osd 处理Notify消息
注意:如果此时有watcher在osd接收Notify消息前就已经网络出现异常或者客户端异常,当watch timeout了osd会调用handle_watch_timeout跟新obc->watchers列表,当所有副本更新完成后最后调用do_osd_op_effects删除相应的watch: watch->remove(i->send_disconnect);
最终osd会把发给watcher的notify消息忽略掉直接返回给客户端
2.8 osd 处理Notify ack消息
Notify::complete_watcher处理:
Notify::maybe_complete_notify处理,该函数有几处被调用的地方:
- Notify::do_timeout //notify消息发送超过一定时间未得到响应
- Notify::complete_watcher //notify ack
- Notify::complete_watcher_remove //watch客户端连接断开
- Notify::init //OSD接收到客户端的notify消息
void Notify::maybe_complete_notify()
{
dout(10) << "maybe_complete_notify -- "
<< watchers.size()
<< " in progress watchers " << dendl;
//如果notify处理超时会把timed_out置为true
if (watchers.empty() || timed_out) {
// prepare reply
bufferlist bl;
::encode(notify_replies, bl);
list<pair<uint64_t,uint64_t> > missed;
//遍历notify的watcher链表,链表中的watcher都是未回notify ack,把这些信息返回给notify发送者
for (set<WatchRef>::iterator p = watchers.begin(); p != watchers.end(); ++p) {
missed.push_back(make_pair((*p)->get_watcher_gid(),
(*p)->get_cookie()));
}
::encode(missed, bl);
//发送CEPH_WATCH_EVENT_NOTIFY_COMPLETE op
bufferlist empty;
MWatchNotify *reply(new MWatchNotify(cookie, version, notify_id,
CEPH_WATCH_EVENT_NOTIFY_COMPLETE, empty));
reply->notifier_gid = client_gid;
reply->set_data(bl);
if (timed_out)
reply->return_code = -ETIMEDOUT;
client->send_message(reply);
unregister_cb();
complete = true;
}
}
2.9 notify消息发送客户端接收notify ack处理
对于notify类型消息,客户端都是调用handle_watch_notify接口处理的
3. 总结
- 客户端可能主动发送Notify消息到其他客户端,也同时接受来自其他客户端的消息
- 客户端自己也会收到自己发出去的notify消息,通过cookie确定LingerOp(这里可能会存在两个或以上的LingerOp,一个是自己注册watch的时候创建的,一个是Notify消息创建的),通过LingerOp的is_watch字段可以判断是不是自己发出消息对应的回应消息
- 对于那些Notify通知失败的watcher,Notify发送端也是能够知道的
- librados层提供了linger_check接口可以用于检查watch ping是否超时未及时得到osd的响应
参考:
http://blog.wjin.org/posts/ceph-watchnotify-mechanism.html
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/13620.html