本篇内容介绍了“RadosClient OSDC怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
RadosClient.h
class librados::RadosClient : public Dispatcher //继承自Dispatcher(消息分发类)
{
public:
using Dispatcher::cct;
md_config_t *conf; //配置文件
private:
enum {
DISCONNECTED,
CONNECTING,
CONNECTED,
} state; //网络连接状态
MonClient monclient; // monc
Messenger *messenger; //网络消息接口
uint64_t instance_id;
//相关消息分发 Dispatcher类的函数重写
bool _dispatch(Message *m);
bool ms_dispatch(Message *m);
bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new);
void ms_handle_connect(Connection *con);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con);
Objecter *objecter; // Osdc模块中的 用于发送封装好的OP消息
Mutex lock;
Cond cond;
SafeTimer timer; //定时器
int refcnt;
version_t log_last_version;
rados_log_callback_t log_cb;
void *log_cb_arg;
string log_watch;
int wait_for_osdmap();
public:
Finisher finisher; // 回调函数的类
explicit RadosClient(CephContext *cct_);
~RadosClient();
int ping_monitor(string mon_id, string *result);
int connect(); // 连接
void shutdown();
int watch_flush();
int async_watch_flush(AioCompletionImpl *c);
uint64_t get_instance_id();
int wait_for_latest_osdmap();
// 根据pool名字或id创建ioctx
int create_ioctx(const char *name, IoCtxImpl **io);
int create_ioctx(int64_t, IoCtxImpl **io);
int get_fsid(std::string *s);
// pool相关操作
int64_t lookup_pool(const char *name);
bool pool_requires_alignment(int64_t pool_id);
int pool_requires_alignment2(int64_t pool_id, bool *requires);
uint64_t pool_required_alignment(int64_t pool_id);
int pool_required_alignment2(int64_t pool_id, uint64_t *alignment);
int pool_get_auid(uint64_t pool_id, unsigned long long *auid);
int pool_get_name(uint64_t pool_id, std::string *auid);
int pool_list(std::list<std::pair<int64_t, string> >& ls);
int get_pool_stats(std::list<string>& ls, map<string,::pool_stat_t>& result);
int get_fs_stats(ceph_statfs& result);
/*
-1 was set as the default value and monitor will pickup the right crush rule with below order:
a) osd pool default crush replicated ruleset
b) the first ruleset in crush ruleset
c) error out if no value find
*/
// 同步创建pool 和 异步创建pool
int pool_create(string& name, unsigned long long auid=0, int16_t crush_rule=-1);
int pool_create_async(string& name, PoolAsyncCompletionImpl *c, unsigned long long auid=0,
int16_t crush_rule=-1);
int pool_get_base_tier(int64_t pool_id, int64_t* base_tier);
//同步删除和异步删除
int pool_delete(const char *name);
int pool_delete_async(const char *name, PoolAsyncCompletionImpl *c);
int blacklist_add(const string& client_address, uint32_t expire_seconds);
//Mon命令处理,调用monclient.start_mon_command 把命令发送给Mon处理
int mon_command(const vector<string>& cmd, const bufferlist &inbl,
bufferlist *outbl, string *outs);
int mon_command(int rank,
const vector<string>& cmd, const bufferlist &inbl,
bufferlist *outbl, string *outs);
int mon_command(string name,
const vector<string>& cmd, const bufferlist &inbl,
bufferlist *outbl, string *outs);
//OSD命令处理,objector->osd_command 把命令发送给OSD处理
int osd_command(int osd, vector<string>& cmd, const bufferlist& inbl,
bufferlist *poutbl, string *prs);
//PG命令处理,objector->pg_command 把命令发送给PG处理
int pg_command(pg_t pgid, vector<string>& cmd, const bufferlist& inbl,
bufferlist *poutbl, string *prs);
void handle_log(MLog *m);
int monitor_log(const string& level, rados_log_callback_t cb, void *arg);
void get();
bool put();
void blacklist_self(bool set);
};
connect() 连接
int librados::RadosClient::connect()
{
common_init_finish(cct);
int err;
// already connected?
if (state == CONNECTING)
return -EINPROGRESS;
if (state == CONNECTED)
return -EISCONN;
state = CONNECTING;
// get monmap
err = monclient.build_initial_monmap(); //通过配置文件获取初始化的Monitor
if (err < 0)
goto out;
err = -ENOMEM;
messenger = Messenger::create_client_messenger(cct, "radosclient"); //创建通信模块
if (!messenger)
goto out;
// require OSDREPLYMUX feature. this means we will fail to talk to
// old servers. this is necessary because otherwise we won't know
// how to decompose the reply data into its consituent pieces.
messenger->set_default_policy(Messenger::Policy::lossy_client(0, CEPH_FEATURE_OSDREPLYMUX));
ldout(cct, 1) << "starting msgr at " << messenger->get_myaddr() << dendl;
ldout(cct, 1) << "starting objecter" << dendl;
//创建objecter
objecter = new (std::nothrow) Objecter(cct, messenger, &monclient,
&finisher,
cct->_conf->rados_mon_op_timeout,
cct->_conf->rados_osd_op_timeout);
if (!objecter)
goto out;
objecter->set_balanced_budget();
// mc添加 messenger
monclient.set_messenger(messenger);
// objecter 初始化
objecter->init();
// messenger添加 dispather
messenger->add_dispatcher_tail(objecter);
messenger->add_dispatcher_tail(this);
// messenger启动
messenger->start();
ldout(cct, 1) << "setting wanted keys" << dendl;
monclient.set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
ldout(cct, 1) << "calling monclient init" << dendl;
// mc 初始化
err = monclient.init();
if (err) {
ldout(cct, 0) << conf->name << " initialization error " << cpp_strerror(-err) << dendl;
shutdown();
goto out;
}
err = monclient.authenticate(conf->client_mount_timeout);
if (err) {
ldout(cct, 0) << conf->name << " authentication error " << cpp_strerror(-err) << dendl;
shutdown();
goto out;
}
messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id()));
objecter->set_client_incarnation(0);
// objecter 启动
objecter->start();
lock.Lock();
// 定时器初始化
timer.init();
monclient.renew_subs();
//执行回调的完成类start
finisher.start();
// 更新 状态为已连接
state = CONNECTED;
instance_id = monclient.get_global_id();
...
}
create_ioctx 根据pool创建ioctx
int librados::RadosClient::create_ioctx(const char *name, IoCtxImpl **io)
{
// 获取 poolid
int64_t poolid = lookup_pool(name);
...
// 创建 IoCtxImpl
*io = new librados::IoCtxImpl(this, objecter, poolid, CEPH_NOSNAP);
return 0;
}
Mon OSD pg 命令操作
int librados::RadosClient::mon_command(const vector<string>& cmd,
const bufferlist &inbl,
bufferlist *outbl, string *outs)
{
// mc start_mon_command 发送到monitor
monclient.start_mon_command(cmd, inbl, outbl, outs,
new C_SafeCond(&mylock, &cond, &done, &rval));
}
int librados::RadosClient::osd_command(int osd, vector<string>& cmd,
const bufferlist& inbl,
bufferlist *poutbl, string *prs)
{
// 发送到osd
int r = objecter->osd_command(osd, cmd, inbl, &tid, poutbl, prs,
new C_SafeCond(&mylock, &cond, &done, &ret));
}
int librados::RadosClient::pg_command(pg_t pgid, vector<string>& cmd,
const bufferlist& inbl,
bufferlist *poutbl, string *prs)
{
// 发送到pg
int r = objecter->pg_command(pgid, cmd, inbl, &tid, poutbl, prs,
new C_SafeCond(&mylock, &cond, &done, &ret));
}
Ioctximpl
librados::IoCtx的实现IoCtxImpl
把请求封装成ObjectOperation 类(osdc 中的)
把相关的pool信息添加到里面,封装成Objecter::Op对像
调用相应的函数 objecter- >op_submit 发送给相应的OSD
操作完成后,调用相应的回调函数。
如rados_write
extern "C" int rados_write(rados_ioctx_t io, const char *o, const char *buf, size_t len, uint64_t off)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
bufferlist bl;
bl.append(buf, len);
int retval = ctx->write(oid, bl, len, off);
}
调用IoCtxImpl::write
int librados::IoCtxImpl::write(const object_t& oid, bufferlist& bl,
size_t len, uint64_t off)
{
::ObjectOperation op;
prepare_assert_ops(&op); // assert ops
bufferlist mybl;
mybl.substr_of(bl, 0, len);
op.write(off, mybl); // 封装到op.write Objecter.h ObjectOperation write
return operate(oid, &op, NULL); // IoCtxImpl::operate
}
int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o,
ceph::real_time *pmtime, int flags)
{
int op = o->ops[0].op.op;
Objecter::Op *objecter_op = objecter->prepare_mutate_op(oid, oloc,
*o, snapc, ut, flags,
NULL, oncommit, &ver);
objecter->op_submit(objecter_op);
}
AioCompletionImpl
OSDC
ObjectOperation
struct ObjectOperation {
vector<OSDOp> ops; // ops集合
int flags;
int priority;
vector<bufferlist*> out_bl; // 输出bufferlist
vector<Context*> out_handler; // 回调函数
vector<int*> out_rval; // 返回码集合
size_t size() { // op个数
return ops.size();
}
/**
* This is a more limited form of C_Contexts, but that requires
* a ceph_context which we don't have here.
*/
// 用户添加回调函数
class C_TwoContexts : public Context {
Context *first;
Context *second;
};
/**
* Add a callback to run when this operation completes,
* after any other callbacks for it.
*/
// 添加回调函数
void add_handler(Context *extra) {
size_t last = out_handler.size() - 1;
Context *orig = out_handler[last];
if (orig) {
Context *wrapper = new C_TwoContexts(orig, extra);
out_handler[last] = wrapper;
} else {
out_handler[last] = extra;
}
}
// 添加操作
OSDOp& add_op(int op) {
int s = ops.size();
ops.resize(s+1);
ops[s].op.op = op;
out_bl.resize(s+1);
out_bl[s] = NULL;
out_handler.resize(s+1);
out_handler[s] = NULL;
out_rval.resize(s+1);
out_rval[s] = NULL;
return ops[s];
}
// 添加data
void add_data(int op, uint64_t off, uint64_t len, bufferlist& bl) {
OSDOp& osd_op = add_op(op);
osd_op.op.extent.offset = off;
osd_op.op.extent.length = len;
osd_op.indata.claim_append(bl);
}
void add_clone_range(int op, uint64_t off, uint64_t len,
const object_t& srcoid, uint64_t srcoff,
snapid_t srcsnapid) {}
void add_xattr(int op, const char *name, const bufferlist& data) {}
void add_xattr_cmp(int op, const char *name, uint8_t cmp_op,
uint8_t cmp_mode, const bufferlist& data) {}
// 添加call method
void add_call(int op, const char *cname, const char *method,
bufferlist &indata,
bufferlist *outbl, Context *ctx, int *prval) {
OSDOp& osd_op = add_op(op);
unsigned p = ops.size() - 1;
out_handler[p] = ctx;
out_bl[p] = outbl;
out_rval[p] = prval;
osd_op.op.cls.class_len = strlen(cname);
osd_op.op.cls.method_len = strlen(method);
osd_op.op.cls.indata_len = indata.length();
osd_op.indata.append(cname, osd_op.op.cls.class_len);
osd_op.indata.append(method, osd_op.op.cls.method_len);
osd_op.indata.append(indata);
}
void add_pgls(int op, uint64_t count, collection_list_handle_t cookie,
epoch_t start_epoch) {}
void add_pgls_filter(int op, uint64_t count, const bufferlist& filter,
collection_list_handle_t cookie, epoch_t start_epoch) {}
void add_alloc_hint(int op, uint64_t expected_object_size,
uint64_t expected_write_size) {}
// ------
// pg 操作
void pg_ls(uint64_t count, bufferlist& filter,
collection_list_handle_t cookie, epoch_t start_epoch) {}
void pg_nls(uint64_t count, const bufferlist& filter,
collection_list_handle_t cookie, epoch_t start_epoch) {}
// 创建 操作
void create(bool excl) {
OSDOp& o = add_op(CEPH_OSD_OP_CREATE);
o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0);
}
// 状态
struct C_ObjectOperation_stat : public Context {
bufferlist bl;
uint64_t *psize;
ceph::real_time *pmtime;
time_t *ptime;
struct timespec *pts;
int *prval;
// 完成大小,时间等
void finish(int r) {}
}
};
// 查看状态,获取C_ObjectOperation_stat
void stat(uint64_t *psize, ceph::real_time *pmtime, int *prval) {}
void stat(uint64_t *psize, time_t *ptime, int *prval) {}
void stat(uint64_t *psize, struct timespec *pts, int *prval) {}
// object data
// 读操作
void read(uint64_t off, uint64_t len, bufferlist *pbl, int *prval,
Context* ctx) {
bufferlist bl;
add_data(CEPH_OSD_OP_READ, off, len, bl);
unsigned p = ops.size() - 1;
out_bl[p] = pbl;
out_rval[p] = prval;
out_handler[p] = ctx;
}
void sparse_read(uint64_t off, uint64_t len, std::map<uint64_t,uint64_t> *m,
bufferlist *data_bl, int *prval) {}
// 写操作
void write(uint64_t off, bufferlist& bl,
uint64_t truncate_size,
uint32_t truncate_seq) {
add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl); // 添加data, 将WRITE存入ops,将数据放在op中
OSDOp& o = *ops.rbegin();
o.op.extent.truncate_size = truncate_size;
o.op.extent.truncate_seq = truncate_seq;
}
void write(uint64_t off, bufferlist& bl) {}
void write_full(bufferlist& bl) {}
void append(bufferlist& bl) {}
void zero(uint64_t off, uint64_t len) {}
void truncate(uint64_t off) {}
void remove() {}
void mapext(uint64_t off, uint64_t len) {}
void sparse_read(uint64_t off, uint64_t len) {}
void clone_range(const object_t& src_oid, uint64_t src_offset, uint64_t len,
uint64_t dst_offset) {}
// object attrs
// 属性操作
void getxattr(const char *name, bufferlist *pbl, int *prval) {}
void getxattrs(std::map<std::string,bufferlist> *pattrs, int *prval) {}
void setxattr(const char *name, const bufferlist& bl) {}
void setxattr(const char *name, const string& s) {}
void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode,
const bufferlist& bl) {}
void rmxattr(const char *name) {}
void setxattrs(map<string, bufferlist>& attrs) {}
void resetxattrs(const char *prefix, map<string, bufferlist>& attrs) {}
// trivialmap
void tmap_update(bufferlist& bl) {}
void tmap_put(bufferlist& bl) {}
void tmap_get(bufferlist *pbl, int *prval) {}
void tmap_get() {}
void tmap_to_omap(bool nullok=false) {}
// objectmap
void omap_get_keys(const string &start_after,
uint64_t max_to_get,
std::set<std::string> *out_set,
int *prval) {
OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS);
bufferlist bl;
::encode(start_after, bl);
::encode(max_to_get, bl);
op.op.extent.offset = 0;
op.op.extent.length = bl.length();
op.indata.claim_append(bl);
if (prval || out_set) {
unsigned p = ops.size() - 1;
C_ObjectOperation_decodekeys *h =
new C_ObjectOperation_decodekeys(out_set, prval);
out_handler[p] = h;
out_bl[p] = &h->bl;
out_rval[p] = prval;
}
}
void omap_get_vals(const string &start_after,
const string &filter_prefix,
uint64_t max_to_get,
std::map<std::string, bufferlist> *out_set,
int *prval) {}
void omap_get_vals_by_keys(const std::set<std::string> &to_get,
std::map<std::string, bufferlist> *out_set,
int *prval) {}
void omap_cmp(const std::map<std::string, pair<bufferlist,int> > &assertions,
int *prval) {}
void copy_get(object_copy_cursor_t *cursor,
uint64_t max,
uint64_t *out_size,
ceph::real_time *out_mtime,
std::map<std::string,bufferlist> *out_attrs,
bufferlist *out_data,
bufferlist *out_omap_header,
bufferlist *out_omap_data,
vector<snapid_t> *out_snaps,
snapid_t *out_snap_seq,
uint32_t *out_flags,
uint32_t *out_data_digest,
uint32_t *out_omap_digest,
vector<pair<osd_reqid_t, version_t> > *out_reqids,
uint64_t *truncate_seq,
uint64_t *truncate_size,
int *prval) {}
void undirty() {}
struct C_ObjectOperation_isdirty : public Context {};
void is_dirty(bool *pisdirty, int *prval) {}
void omap_get_header(bufferlist *bl, int *prval) {}
void omap_set(const map<string, bufferlist> &map) {}
void omap_set_header(bufferlist &bl) {}
void omap_clear() {}
void omap_rm_keys(const std::set<std::string> &to_remove) {}
// object classes
void call(const char *cname, const char *method, bufferlist &indata) {}
void call(const char *cname, const char *method, bufferlist &indata,
bufferlist *outdata, Context *ctx, int *prval) {}
void rollback(uint64_t snapid) {}
void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc,
version_t src_version, unsigned flags,
unsigned src_fadvise_flags) {}
};
OSDOp osd_types.h
struct OSDOp {
ceph_osd_op op; // 操作
sobject_t soid; // oid
bufferlist indata, outdata; // 输入输出data
int32_t rval; // 返回码
};
Objecter
class Objecter : public md_config_obs_t, public Dispatcher {
public:
Messenger *messenger; // 消息
MonClient *monc; // mc
Finisher *finisher;
private:
OSDMap *osdmap; // osdmap
public:
using Dispatcher::cct;
std::multimap<string,string> crush_location;
atomic_t initialized;
private:
atomic64_t last_tid;
atomic_t inflight_ops;
atomic_t client_inc;
uint64_t max_linger_id;
atomic_t num_unacked;
atomic_t num_uncommitted;
atomic_t global_op_flags; // flags which are applied to each IO op
bool keep_balanced_budget;
bool honor_osdmap_full;
public:
void maybe_request_map();
private:
void _maybe_request_map();
version_t last_seen_osdmap_version;
version_t last_seen_pgmap_version;
mutable boost::shared_mutex rwlock;
using lock_guard = std::unique_lock<decltype(rwlock)>;
using unique_lock = std::unique_lock<decltype(rwlock)>;
using shared_lock = boost::shared_lock<decltype(rwlock)>;
using shunique_lock = ceph::shunique_lock<decltype(rwlock)>;
ceph::timer<ceph::mono_clock> timer;
PerfCounters *logger;
uint64_t tick_event;
void start_tick();
void tick();
void update_crush_location();
public:
/*** track pending operations ***/
// read
public:
struct OSDSession;
struct op_target_t {} // 操作目标
struct Op : public RefCountedObject {}; // 操作
};
op_target_t
操作目标,封装pg信息,osd信息
struct op_target_t {
int flags;
object_t base_oid;
object_locator_t base_oloc;
object_t target_oid; // 目标oid
object_locator_t target_oloc; // 位置
// 是否 base_pgid
bool precalc_pgid; ///< true if we are directed at base_pgid, not base_oid
// 直接的 pgid
pg_t base_pgid; ///< explciti pg target, if any
pg_t pgid; ///< last pg we mapped to
unsigned pg_num; ///< last pg_num we mapped to
unsigned pg_num_mask; ///< last pg_num_mask we mapped to
// 启动的osd
vector<int> up; ///< set of up osds for last pg we mapped to
// acting osd
vector<int> acting; ///< set of acting osds for last pg we mapped to
// primary
int up_primary; ///< primary for last pg we mapped to based on the up set
int acting_primary; ///< primary for last pg we mapped to based on the
/// acting set
// pool 大小
int size; ///< the size of the pool when were were last mapped
// pool 最小size
int min_size; ///< the min size of the pool when were were last mapped
// 是否按位排序
bool sort_bitwise; ///< whether the hobject_t sort order is bitwise
// 是否副本
bool used_replica;
bool paused;
int osd; ///< the final target osd, or -1
};
操作
struct Op : public RefCountedObject {
OSDSession *session; // session 连接
int incarnation;
op_target_t target; // 操作目标
ConnectionRef con; // for rx buffer only
uint64_t features; // explicitly specified op features
vector<OSDOp> ops; // 操作集合
snapid_t snapid;
SnapContext snapc;
ceph::real_time mtime;
bufferlist *outbl;
vector<bufferlist*> out_bl;
vector<Context*> out_handler;
vector<int*> out_rval;
int priority;
Context *onack, *oncommit;
uint64_t ontimeout;
Context *oncommit_sync; // used internally by watch/notify
ceph_tid_t tid;
eversion_t replay_version; // for op replay
int attempts;
version_t *objver;
epoch_t *reply_epoch;
ceph::mono_time stamp;
epoch_t map_dne_bound;
bool budgeted;
/// true if we should resend this message on failure
bool should_resend;
/// true if the throttle budget is get/put on a series of OPs,
/// instead of per OP basis, when this flag is set, the budget is
/// acquired before sending the very first OP of the series and
/// released upon receiving the last OP reply.
bool ctx_budgeted;
int *data_offset;
epoch_t last_force_resend;
osd_reqid_t reqid; // explicitly setting reqid
};
分片 Striper
扩展 ObjectExtent
记录分片信息
class ObjectExtent {
public:
object_t oid; // object id
uint64_t objectno; // 序号
uint64_t offset; // in object object内偏移
uint64_t length; // in object 分片长度
uint64_t truncate_size; // in object
object_locator_t oloc; // object locator (pool etc) pool位置
vector<pair<uint64_t,uint64_t> > buffer_extents; // off -> len. extents in buffer being mapped (may be fragmented bc of striping!)
};
“RadosClient OSDC怎么使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注天达云网站,小编将为大家输出更多高质量的实用文章!