diff options
35 files changed, 2558 insertions, 1920 deletions
diff --git a/Documentation/filesystems/ceph.txt b/Documentation/filesystems/ceph.txt index 0b302a11718a..d7f011ddc150 100644 --- a/Documentation/filesystems/ceph.txt +++ b/Documentation/filesystems/ceph.txt @@ -62,6 +62,18 @@ subdirectories, and a summation of all nested file sizes. This makes the identification of large disk space consumers relatively quick, as no 'du' or similar recursive scan of the file system is required. +Finally, Ceph also allows quotas to be set on any directory in the system. +The quota can restrict the number of bytes or the number of files stored +beneath that point in the directory hierarchy. Quotas can be set using +extended attributes 'ceph.quota.max_files' and 'ceph.quota.max_bytes', eg: + + setfattr -n ceph.quota.max_bytes -v 100000000 /some/dir + getfattr -n ceph.quota.max_bytes /some/dir + +A limitation of the current quotas implementation is that it relies on the +cooperation of the client mounting the file system to stop writers when a +limit is reached. A modified or adversarial client cannot be prevented +from writing as much data as it needs. Mount Syntax ============ @@ -137,6 +149,10 @@ Mount Options noasyncreaddir Do not use the dcache as above for readdir. + noquotadf + Report overall filesystem usage in statfs instead of using the root + directory quota. + More Information ================ diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 1e03b04819c8..07dc5419bd63 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -32,6 +32,7 @@ #include <linux/ceph/osd_client.h> #include <linux/ceph/mon_client.h> #include <linux/ceph/cls_lock_client.h> +#include <linux/ceph/striper.h> #include <linux/ceph/decode.h> #include <linux/parser.h> #include <linux/bsearch.h> @@ -200,95 +201,81 @@ struct rbd_client { }; struct rbd_img_request; -typedef void (*rbd_img_callback_t)(struct rbd_img_request *); - -#define BAD_WHICH U32_MAX /* Good which or bad which, which? */ - -struct rbd_obj_request; -typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *); enum obj_request_type { - OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES + OBJ_REQUEST_NODATA = 1, + OBJ_REQUEST_BIO, /* pointer into provided bio (list) */ + OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */ + OBJ_REQUEST_OWN_BVECS, /* private bio_vec array, doesn't own pages */ }; enum obj_operation_type { + OBJ_OP_READ = 1, OBJ_OP_WRITE, - OBJ_OP_READ, OBJ_OP_DISCARD, }; -enum obj_req_flags { - OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */ - OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */ - OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */ - OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */ +/* + * Writes go through the following state machine to deal with + * layering: + * + * need copyup + * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP + * | ^ | + * v \------------------------------/ + * done + * ^ + * | + * RBD_OBJ_WRITE_FLAT + * + * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether + * there is a parent or not. + */ +enum rbd_obj_write_state { + RBD_OBJ_WRITE_FLAT = 1, + RBD_OBJ_WRITE_GUARD, + RBD_OBJ_WRITE_COPYUP, }; struct rbd_obj_request { - u64 object_no; - u64 offset; /* object start byte */ - u64 length; /* bytes from offset */ - unsigned long flags; - - /* - * An object request associated with an image will have its - * img_data flag set; a standalone object request will not. - * - * A standalone object request will have which == BAD_WHICH - * and a null obj_request pointer. - * - * An object request initiated in support of a layered image - * object (to check for its existence before a write) will - * have which == BAD_WHICH and a non-null obj_request pointer. - * - * Finally, an object request for rbd image data will have - * which != BAD_WHICH, and will have a non-null img_request - * pointer. The value of which will be in the range - * 0..(img_request->obj_request_count-1). - */ + struct ceph_object_extent ex; union { - struct rbd_obj_request *obj_request; /* STAT op */ - struct { - struct rbd_img_request *img_request; - u64 img_offset; - /* links for img_request->obj_requests list */ - struct list_head links; - }; + bool tried_parent; /* for reads */ + enum rbd_obj_write_state write_state; /* for writes */ }; - u32 which; /* posn image request list */ - enum obj_request_type type; + struct rbd_img_request *img_request; + struct ceph_file_extent *img_extents; + u32 num_img_extents; + union { - struct bio *bio_list; + struct ceph_bio_iter bio_pos; struct { - struct page **pages; - u32 page_count; + struct ceph_bvec_iter bvec_pos; + u32 bvec_count; + u32 bvec_idx; }; }; - struct page **copyup_pages; - u32 copyup_page_count; + struct bio_vec *copyup_bvecs; + u32 copyup_bvec_count; struct ceph_osd_request *osd_req; u64 xferred; /* bytes transferred */ int result; - rbd_obj_callback_t callback; - struct kref kref; }; enum img_req_flags { - IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */ IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */ IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ - IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */ }; struct rbd_img_request { struct rbd_device *rbd_dev; - u64 offset; /* starting image byte offset */ - u64 length; /* byte count from offset */ + enum obj_operation_type op_type; + enum obj_request_type data_type; unsigned long flags; union { u64 snap_id; /* for reads */ @@ -298,26 +285,21 @@ struct rbd_img_request { struct request *rq; /* block request */ struct rbd_obj_request *obj_request; /* obj req initiator */ }; - struct page **copyup_pages; - u32 copyup_page_count; - spinlock_t completion_lock;/* protects next_completion */ - u32 next_completion; - rbd_img_callback_t callback; + spinlock_t completion_lock; u64 xferred;/* aggregate bytes transferred */ int result; /* first nonzero obj_request result */ + struct list_head object_extents; /* obj_req.ex structs */ u32 obj_request_count; - struct list_head obj_requests; /* rbd_obj_request structs */ + u32 pending_count; struct kref kref; }; #define for_each_obj_request(ireq, oreq) \ - list_for_each_entry(oreq, &(ireq)->obj_requests, links) -#define for_each_obj_request_from(ireq, oreq) \ - list_for_each_entry_from(oreq, &(ireq)->obj_requests, links) + list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item) #define for_each_obj_request_safe(ireq, oreq, n) \ - list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) + list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item) enum rbd_watch_state { RBD_WATCH_STATE_UNREGISTERED, @@ -433,8 +415,6 @@ static DEFINE_SPINLOCK(rbd_client_list_lock); static struct kmem_cache *rbd_img_request_cache; static struct kmem_cache *rbd_obj_request_cache; -static struct bio_set *rbd_bio_clone; - static int rbd_major; static DEFINE_IDA(rbd_dev_id_ida); @@ -447,8 +427,6 @@ static bool single_major = true; module_param(single_major, bool, S_IRUGO); MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)"); -static int rbd_img_request_submit(struct rbd_img_request *img_request); - static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count); static ssize_t rbd_remove(struct bus_type *bus, const char *buf, @@ -458,7 +436,6 @@ static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf, static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf, size_t count); static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth); -static void rbd_spec_put(struct rbd_spec *spec); static int rbd_dev_id_to_minor(int dev_id) { @@ -577,9 +554,6 @@ void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...) # define rbd_assert(expr) ((void) 0) #endif /* !RBD_DEBUG */ -static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request); -static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request); -static void rbd_img_parent_read(struct rbd_obj_request *obj_request); static void rbd_dev_remove_parent(struct rbd_device *rbd_dev); static int rbd_dev_refresh(struct rbd_device *rbd_dev); @@ -857,26 +831,6 @@ static char* obj_op_name(enum obj_operation_type op_type) } /* - * Get a ceph client with specific addr and configuration, if one does - * not exist create it. Either way, ceph_opts is consumed by this - * function. - */ -static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) -{ - struct rbd_client *rbdc; - - mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING); - rbdc = rbd_client_find(ceph_opts); - if (rbdc) /* using an existing client */ - ceph_destroy_options(ceph_opts); - else - rbdc = rbd_client_create(ceph_opts); - mutex_unlock(&client_mutex); - - return rbdc; -} - -/* * Destroy ceph client * * Caller must hold rbd_client_list_lock. @@ -904,6 +858,56 @@ static void rbd_put_client(struct rbd_client *rbdc) kref_put(&rbdc->kref, rbd_client_release); } +static int wait_for_latest_osdmap(struct ceph_client *client) +{ + u64 newest_epoch; + int ret; + + ret = ceph_monc_get_version(&client->monc, "osdmap", &newest_epoch); + if (ret) + return ret; + + if (client->osdc.osdmap->epoch >= newest_epoch) + return 0; + + ceph_osdc_maybe_request_map(&client->osdc); + return ceph_monc_wait_osdmap(&client->monc, newest_epoch, + client->options->mount_timeout); +} + +/* + * Get a ceph client with specific addr and configuration, if one does + * not exist create it. Either way, ceph_opts is consumed by this + * function. + */ +static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) +{ + struct rbd_client *rbdc; + int ret; + + mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING); + rbdc = rbd_client_find(ceph_opts); + if (rbdc) { + ceph_destroy_options(ceph_opts); + + /* + * Using an existing client. Make sure ->pg_pools is up to + * date before we look up the pool id in do_rbd_add(). + */ + ret = wait_for_latest_osdmap(rbdc->client); + if (ret) { + rbd_warn(NULL, "failed to get latest osdmap: %d", ret); + rbd_put_client(rbdc); + rbdc = ERR_PTR(ret); + } + } else { + rbdc = rbd_client_create(ceph_opts); + } + mutex_unlock(&client_mutex); + + return rbdc; +} + static bool rbd_image_format_valid(u32 image_format) { return image_format == 1 || image_format == 2; @@ -1223,272 +1227,59 @@ static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev) rbd_dev->mapping.features = 0; } -static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) -{ - u64 segment_size = rbd_obj_bytes(&rbd_dev->header); - - return offset & (segment_size - 1); -} - -static u64 rbd_segment_length(struct rbd_device *rbd_dev, - u64 offset, u64 length) -{ - u64 segment_size = rbd_obj_bytes(&rbd_dev->header); - - offset &= segment_size - 1; - - rbd_assert(length <= U64_MAX - offset); - if (offset + length > segment_size) - length = segment_size - offset; - - return length; -} - -/* - * bio helpers - */ - -static void bio_chain_put(struct bio *chain) -{ - struct bio *tmp; - - while (chain) { - tmp = chain; - chain = chain->bi_next; - bio_put(tmp); - } -} - -/* - * zeros a bio chain, starting at specific offset - */ -static void zero_bio_chain(struct bio *chain, int start_ofs) +static void zero_bvec(struct bio_vec *bv) { - struct bio_vec bv; - struct bvec_iter iter; - unsigned long flags; void *buf; - int pos = 0; - - while (chain) { - bio_for_each_segment(bv, chain, iter) { - if (pos + bv.bv_len > start_ofs) { - int remainder = max(start_ofs - pos, 0); - buf = bvec_kmap_irq(&bv, &flags); - memset(buf + remainder, 0, - bv.bv_len - remainder); - flush_dcache_page(bv.bv_page); - bvec_kunmap_irq(buf, &flags); - } - pos += bv.bv_len; - } + unsigned long flags; - chain = chain->bi_next; - } + buf = bvec_kmap_irq(bv, &flags); + memset(buf, 0, bv->bv_len); + flush_dcache_page(bv->bv_page); + bvec_kunmap_irq(buf, &flags); } -/* - * similar to zero_bio_chain(), zeros data defined by a page array, - * starting at the given byte offset from the start of the array and - * continuing up to the given end offset. The pages array is - * assumed to be big enough to hold all bytes up to the end. - */ -static void zero_pages(struct page **pages, u64 offset, u64 end) +static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes) { - struct page **page = &pages[offset >> PAGE_SHIFT]; + struct ceph_bio_iter it = *bio_pos; - rbd_assert(end > offset); - rbd_assert(end - offset <= (u64)SIZE_MAX); - while (offset < end) { - size_t page_offset; - size_t length; - unsigned long flags; - void *kaddr; - - page_offset = offset & ~PAGE_MASK; - length = min_t(size_t, PAGE_SIZE - page_offset, end - offset); - local_irq_save(flags); - kaddr = kmap_atomic(*page); - memset(kaddr + page_offset, 0, length); - flush_dcache_page(*page); - kunmap_atomic(kaddr); - local_irq_restore(flags); - - offset += length; - page++; - } + ceph_bio_iter_advance(&it, off); + ceph_bio_iter_advance_step(&it, bytes, ({ + zero_bvec(&bv); + })); } -/* - * Clone a portion of a bio, starting at the given byte offset - * and continuing for the number of bytes indicated. - */ -static struct bio *bio_clone_range(struct bio *bio_src, - unsigned int offset, - unsigned int len, - gfp_t gfpmask) +static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes) { - struct bio *bio; - - bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone); - if (!bio) - return NULL; /* ENOMEM */ + struct ceph_bvec_iter it = *bvec_pos; - bio_advance(bio, offset); - bio->bi_iter.bi_size = len; - - return bio; + ceph_bvec_iter_advance(&it, off); + ceph_bvec_iter_advance_step(&it, bytes, ({ + zero_bvec(&bv); + })); } /* - * Clone a portion of a bio chain, starting at the given byte offset - * into the first bio in the source chain and continuing for the - * number of bytes indicated. The result is another bio chain of - * exactly the given length, or a null pointer on error. - * - * The bio_src and offset parameters are both in-out. On entry they - * refer to the first source bio and the offset into that bio where - * the start of data to be cloned is located. + * Zero a range in @obj_req data buffer defined by a bio (list) or + * (private) bio_vec array. * - * On return, bio_src is updated to refer to the bio in the source - * chain that contains first un-cloned byte, and *offset will - * contain the offset of that byte within that bio. + * @off is relative to the start of the data buffer. */ -static struct bio *bio_chain_clone_range(struct bio **bio_src, - unsigned int *offset, - unsigned int len, - gfp_t gfpmask) +static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off, + u32 bytes) { - struct bio *bi = *bio_src; - unsigned int off = *offset; - struct bio *chain = NULL; - struct bio **end; - - /* Build up a chain of clone bios up to the limit */ - - if (!bi || off >= bi->bi_iter.bi_size || !len) - return NULL; /* Nothing to clone */ - - end = &chain; - while (len) { - unsigned int bi_size; - struct bio *bio; - - if (!bi) { - rbd_warn(NULL, "bio_chain exhausted with %u left", len); - goto out_err; /* EINVAL; ran out of bio's */ - } - bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len); - bio = bio_clone_range(bi, off, bi_size, gfpmask); - if (!bio) - goto out_err; /* ENOMEM */ - - *end = bio; - end = &bio->bi_next; - - off += bi_size; - if (off == bi->bi_iter.bi_size) { - bi = bi->bi_next; - off = 0; - } - len -= bi_size; - } - *bio_src = bi; - *offset = off; - - return chain; -out_err: - bio_chain_put(chain); - - return NULL; -} - -/* - * The default/initial value for all object request flags is 0. For - * each flag, once its value is set to 1 it is never reset to 0 - * again. - */ -static void obj_request_img_data_set(struct rbd_obj_request *obj_request) -{ - if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) { - struct rbd_device *rbd_dev; - - rbd_dev = obj_request->img_request->rbd_dev; - rbd_warn(rbd_dev, "obj_request %p already marked img_data", - obj_request); - } -} - -static bool obj_request_img_data_test(struct rbd_obj_request *obj_request) -{ - smp_mb(); - return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0; -} - -static void obj_request_done_set(struct rbd_obj_request *obj_request) -{ - if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) { - struct rbd_device *rbd_dev = NULL; - - if (obj_request_img_data_test(obj_request)) - rbd_dev = obj_request->img_request->rbd_dev; - rbd_warn(rbd_dev, "obj_request %p already marked done", - obj_request); + switch (obj_req->img_request->data_type) { + case OBJ_REQUEST_BIO: + zero_bios(&obj_req->bio_pos, off, bytes); + break; + case OBJ_REQUEST_BVECS: + case OBJ_REQUEST_OWN_BVECS: + zero_bvecs(&obj_req->bvec_pos, off, bytes); + break; + default: + rbd_assert(0); } } -static bool obj_request_done_test(struct rbd_obj_request *obj_request) -{ - smp_mb(); - return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0; -} - -/* - * This sets the KNOWN flag after (possibly) setting the EXISTS - * flag. The latter is set based on the "exists" value provided. - * - * Note that for our purposes once an object exists it never goes - * away again. It's possible that the response from two existence - * checks are separated by the creation of the target object, and - * the first ("doesn't exist") response arrives *after* the second - * ("does exist"). In that case we ignore the second one. - */ -static void obj_request_existence_set(struct rbd_obj_request *obj_request, - bool exists) -{ - if (exists) - set_bit(OBJ_REQ_EXISTS, &obj_request->flags); - set_bit(OBJ_REQ_KNOWN, &obj_request->flags); - smp_mb(); -} - -static bool obj_request_known_test(struct rbd_obj_request *obj_request) -{ - smp_mb(); - return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0; -} - -static bool obj_request_exists_test(struct rbd_obj_request *obj_request) -{ - smp_mb(); - return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0; -} - -static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request) -{ - struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; - - return obj_request->img_offset < - round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header)); -} - -static void rbd_obj_request_get(struct rbd_obj_request *obj_request) -{ - dout("%s: obj %p (was %d)\n", __func__, obj_request, - kref_read(&obj_request->kref)); - kref_get(&obj_request->kref); -} - static void rbd_obj_request_destroy(struct kref *kref); static void rbd_obj_request_put(struct rbd_obj_request *obj_request) { @@ -1505,18 +1296,13 @@ static void rbd_img_request_get(struct rbd_img_request *img_request) kref_get(&img_request->kref); } -static bool img_request_child_test(struct rbd_img_request *img_request); -static void rbd_parent_request_destroy(struct kref *kref); static void rbd_img_request_destroy(struct kref *kref); static void rbd_img_request_put(struct rbd_img_request *img_request) { rbd_assert(img_request != NULL); dout("%s: img %p (was %d)\n", __func__, img_request, kref_read(&img_request->kref)); - if (img_request_child_test(img_request)) - kref_put(&img_request->kref, rbd_parent_request_destroy); - else - kref_put(&img_request->kref, rbd_img_request_destroy); + kref_put(&img_request->kref, rbd_img_request_destroy); } static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, @@ -1526,139 +1312,37 @@ static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, /* Image request now owns object's original reference */ obj_request->img_request = img_request; - obj_request->which = img_request->obj_request_count; - rbd_assert(!obj_request_img_data_test(obj_request)); - obj_request_img_data_set(obj_request); - rbd_assert(obj_request->which != BAD_WHICH); img_request->obj_request_count++; - list_add_tail(&obj_request->links, &img_request->obj_requests); - dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, - obj_request->which); + img_request->pending_count++; + dout("%s: img %p obj %p\n", __func__, img_request, obj_request); } static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, struct rbd_obj_request *obj_request) { - rbd_assert(obj_request->which != BAD_WHICH); - - dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, - obj_request->which); - list_del(&obj_request->links); + dout("%s: img %p obj %p\n", __func__, img_request, obj_request); + list_del(&obj_request->ex.oe_item); rbd_assert(img_request->obj_request_count > 0); img_request->obj_request_count--; - rbd_assert(obj_request->which == img_request->obj_request_count); - obj_request->which = BAD_WHICH; - rbd_assert(obj_request_img_data_test(obj_request)); rbd_assert(obj_request->img_request == img_request); - obj_request->img_request = NULL; - obj_request->callback = NULL; rbd_obj_request_put(obj_request); } -static bool obj_request_type_valid(enum obj_request_type type) -{ - switch (type) { - case OBJ_REQUEST_NODATA: - case OBJ_REQUEST_BIO: - case OBJ_REQUEST_PAGES: - return true; - default: - return false; - } -} - -static void rbd_img_obj_callback(struct rbd_obj_request *obj_request); - static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) { struct ceph_osd_request *osd_req = obj_request->osd_req; dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__, - obj_request, obj_request->object_no, obj_request->offset, - obj_request->length, osd_req); - if (obj_request_img_data_test(obj_request)) { - WARN_ON(obj_request->callback != rbd_img_obj_callback); - rbd_img_request_get(obj_request->img_request); - } + obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off, + obj_request->ex.oe_len, osd_req); ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); } -static void rbd_img_request_complete(struct rbd_img_request *img_request) -{ - - dout("%s: img %p\n", __func__, img_request); - - /* - * If no error occurred, compute the aggregate transfer - * count for the image request. We could instead use - * atomic64_cmpxchg() to update it as each object request - * completes; not clear which way is better off hand. - */ - if (!img_request->result) { - struct rbd_obj_request *obj_request; - u64 xferred = 0; - - for_each_obj_request(img_request, obj_request) - xferred += obj_request->xferred; - img_request->xferred = xferred; - } - - if (img_request->callback) - img_request->callback(img_request); - else - rbd_img_request_put(img_request); -} - /* * The default/initial value for all image request flags is 0. Each * is conditionally set to 1 at image request initialization time * and currently never change thereafter. */ -static void img_request_write_set(struct rbd_img_request *img_request) -{ - set_bit(IMG_REQ_WRITE, &img_request->flags); - smp_mb(); -} - -static bool img_request_write_test(struct rbd_img_request *img_request) -{ - smp_mb(); - return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0; -} - -/* - * Set the discard flag when the img_request is an discard request - */ -static void img_request_discard_set(struct rbd_img_request *img_request) -{ - set_bit(IMG_REQ_DISCARD, &img_request->flags); - smp_mb(); -} - -static bool img_request_discard_test(struct rbd_img_request *img_request) -{ - smp_mb(); - return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0; -} - -static void img_request_child_set(struct rbd_img_request *img_request) -{ - set_bit(IMG_REQ_CHILD, &img_request->flags); - smp_mb(); -} - -static void img_request_child_clear(struct rbd_img_request *img_request) -{ - clear_bit(IMG_REQ_CHILD, &img_request->flags); - smp_mb(); -} - -static bool img_request_child_test(struct rbd_img_request *img_request) -{ - smp_mb(); - return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0; -} - static void img_request_layered_set(struct rbd_img_request *img_request) { set_bit(IMG_REQ_LAYERED, &img_request->flags); @@ -1677,209 +1361,70 @@ static bool img_request_layered_test(struct rbd_img_request *img_request) return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0; } -static enum obj_operation_type -rbd_img_request_op_type(struct rbd_img_request *img_request) -{ - if (img_request_write_test(img_request)) - return OBJ_OP_WRITE; - else if (img_request_discard_test(img_request)) - return OBJ_OP_DISCARD; - else - return OBJ_OP_READ; -} - -static void -rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) -{ - u64 xferred = obj_request->xferred; - u64 length = obj_request->length; - - dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, - obj_request, obj_request->img_request, obj_request->result, - xferred, length); - /* - * ENOENT means a hole in the image. We zero-fill the entire - * length of the request. A short read also implies zero-fill - * to the end of the request. An error requires the whole - * length of the request to be reported finished with an error - * to the block layer. In each case we update the xferred - * count to indicate the whole request was satisfied. - */ - rbd_assert(obj_request->type != OBJ_REQUEST_NODATA); - if (obj_request->result == -ENOENT) { - if (obj_request->type == OBJ_REQUEST_BIO) - zero_bio_chain(obj_request->bio_list, 0); - else - zero_pages(obj_request->pages, 0, length); - obj_request->result = 0; - } else if (xferred < length && !obj_request->result) { - if (obj_request->type == OBJ_REQUEST_BIO) - zero_bio_chain(obj_request->bio_list, xferred); - else - zero_pages(obj_request->pages, xferred, length); - } - obj_request->xferred = length; - obj_request_done_set(obj_request); -} - -static void rbd_obj_request_complete(struct rbd_obj_request *obj_request) +static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req) { - dout("%s: obj %p cb %p\n", __func__, obj_request, - obj_request->callback); - obj_request->callback(obj_request); -} + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; -static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err) -{ - obj_request->result = err; - obj_request->xferred = 0; - /* - * kludge - mirror rbd_obj_request_submit() to match a put in - * rbd_img_obj_callback() - */ - if (obj_request_img_data_test(obj_request)) { - WARN_ON(obj_request->callback != rbd_img_obj_callback); - rbd_img_request_get(obj_request->img_request); - } - obj_request_done_set(obj_request); - rbd_obj_request_complete(obj_request); + return !obj_req->ex.oe_off && + obj_req->ex.oe_len == rbd_dev->layout.object_size; } -static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) +static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req) { - struct rbd_img_request *img_request = NULL; - struct rbd_device *rbd_dev = NULL; - bool layered = false; - - if (obj_request_img_data_test(obj_request)) { - img_request = obj_request->img_request; - layered = img_request && img_request_layered_test(img_request); - rbd_dev = img_request->rbd_dev; - } - - dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, - obj_request, img_request, obj_request->result, - obj_request->xferred, obj_request->length); - if (layered && obj_request->result == -ENOENT && - obj_request->img_offset < rbd_dev->parent_overlap) - rbd_img_parent_read(obj_request); - else if (img_request) - rbd_img_obj_request_read_callback(obj_request); - else - obj_request_done_set(obj_request); -} + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; -static void rbd_osd_write_callback(struct rbd_obj_request *obj_request) -{ - dout("%s: obj %p result %d %llu\n", __func__, obj_request, - obj_request->result, obj_request->length); - /* - * There is no such thing as a successful short write. Set - * it to our originally-requested length. - */ - obj_request->xferred = obj_request->length; - obj_request_done_set(obj_request); + return obj_req->ex.oe_off + obj_req->ex.oe_len == + rbd_dev->layout.object_size; } -static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request) +static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req) { - dout("%s: obj %p result %d %llu\n", __func__, obj_request, - obj_request->result, obj_request->length); - /* - * There is no such thing as a successful short discard. Set - * it to our originally-requested length. - */ - obj_request->xferred = obj_request->length; - /* discarding a non-existent object is not a problem */ - if (obj_request->result == -ENOENT) - obj_request->result = 0; - obj_request_done_set(obj_request); + return ceph_file_extents_bytes(obj_req->img_extents, + obj_req->num_img_extents); } -/* - * For a simple stat call there's nothing to do. We'll do more if - * this is part of a write sequence for a layered image. - */ -static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request) +static bool rbd_img_is_write(struct rbd_img_request *img_req) { - dout("%s: obj %p\n", __func__, obj_request); - obj_request_done_set(obj_request); + switch (img_req->op_type) { + case OBJ_OP_READ: + return false; + case OBJ_OP_WRITE: + case OBJ_OP_DISCARD: + return true; + default: + rbd_assert(0); + } } -static void rbd_osd_call_callback(struct rbd_obj_request *obj_request) -{ - dout("%s: obj %p\n", __func__, obj_request); - - if (obj_request_img_data_test(obj_request)) - rbd_osd_copyup_callback(obj_request); - else - obj_request_done_set(obj_request); -} +static void rbd_obj_handle_request(struct rbd_obj_request *obj_req); static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) { - struct rbd_obj_request *obj_request = osd_req->r_priv; - u16 opcode; + struct rbd_obj_request *obj_req = osd_req->r_priv; - dout("%s: osd_req %p\n", __func__, osd_req); - rbd_assert(osd_req == obj_request->osd_req); - if (obj_request_img_data_test(obj_request)) { - rbd_assert(obj_request->img_request); - rbd_assert(obj_request->which != BAD_WHICH); - } else { - rbd_assert(obj_request->which == BAD_WHICH); - } - - if (osd_req->r_result < 0) - obj_request->result = osd_req->r_result; - - /* - * We support a 64-bit length, but ultimately it has to be - * passed to the block layer, which just supports a 32-bit - * length field. - */ - obj_request->xferred = osd_req->r_ops[0].outdata_len; - rbd_assert(obj_request->xferred < (u64)UINT_MAX); + dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, + osd_req->r_result, obj_req); + rbd_assert(osd_req == obj_req->osd_req); - opcode = osd_req->r_ops[0].op; - switch (opcode) { - case CEPH_OSD_OP_READ: - rbd_osd_read_callback(obj_request); - break; - case CEPH_OSD_OP_SETALLOCHINT: - rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE || - osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL); - /* fall through */ - case CEPH_OSD_OP_WRITE: - case CEPH_OSD_OP_WRITEFULL: - rbd_osd_write_callback(obj_request); - break; - case CEPH_OSD_OP_STAT: - rbd_osd_stat_callback(obj_request); - break; - case CEPH_OSD_OP_DELETE: - case CEPH_OSD_OP_TRUNCATE: - case CEPH_OSD_OP_ZERO: - rbd_osd_discard_callback(obj_request); - break; - case CEPH_OSD_OP_CALL: - rbd_osd_call_callback(obj_request); - break; - default: - rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d", - obj_request->object_no, opcode); - break; - } + obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0; + if (!obj_req->result && !rbd_img_is_write(obj_req->img_request)) + obj_req->xferred = osd_req->r_result; + else + /* + * Writes aren't allowed to return a data payload. In some + * guarded write cases (e.g. stat + zero on an empty object) + * a stat response makes it through, but we don't care. + */ + obj_req->xferred = 0; - if (obj_request_done_test(obj_request)) - rbd_obj_request_complete(obj_request); + rbd_obj_handle_request(obj_req); } static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) { struct ceph_osd_request *osd_req = obj_request->osd_req; - rbd_assert(obj_request_img_data_test(obj_request)); + osd_req->r_flags = CEPH_OSD_FLAG_READ; osd_req->r_snapid = obj_request->img_request->snap_id; } @@ -1887,32 +1432,33 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) { struct ceph_osd_request *osd_req = obj_request->osd_req; + osd_req->r_flags = CEPH_OSD_FLAG_WRITE; ktime_get_real_ts(&osd_req->r_mtime); - osd_req->r_data_offset = obj_request->offset; + osd_req->r_data_offset = obj_request->ex.oe_off; } static struct ceph_osd_request * -__rbd_osd_req_create(struct rbd_device *rbd_dev, - struct ceph_snap_context *snapc, - int num_ops, unsigned int flags, - struct rbd_obj_request *obj_request) +rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops) { + struct rbd_img_request *img_req = obj_req->img_request; + struct rbd_device *rbd_dev = img_req->rbd_dev; struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct ceph_osd_request *req; const char *name_format = rbd_dev->image_format == 1 ? RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT; - req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO); + req = ceph_osdc_alloc_request(osdc, + (rbd_img_is_write(img_req) ? img_req->snapc : NULL), + num_ops, false, GFP_NOIO); if (!req) return NULL; - req->r_flags = flags; req->r_callback = rbd_osd_req_callback; - req->r_priv = obj_request; + req->r_priv = obj_req; req->r_base_oloc.pool = rbd_dev->layout.pool_id; if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format, - rbd_dev->header.object_prefix, obj_request->object_no)) + rbd_dev->header.object_prefix, obj_req->ex.oe_objno)) goto err_req; if (ceph_osdc_alloc_messages(req, GFP_NOIO)) @@ -1925,83 +1471,20 @@ err_req: return NULL; } -/* - * Create an osd request. A read request has one osd op (read). - * A write request has either one (watch) or two (hint+write) osd ops. - * (All rbd data writes are prefixed with an allocation hint op, but - * technically osd watch is a write request, hence this distinction.) - */ -static struct ceph_osd_request *rbd_osd_req_create( - struct rbd_device *rbd_dev, - enum obj_operation_type op_type, - unsigned int num_ops, - struct rbd_obj_request *obj_request) -{ - struct ceph_snap_context *snapc = NULL; - - if (obj_request_img_data_test(obj_request) && - (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) { - struct rbd_img_request *img_request = obj_request->img_request; - if (op_type == OBJ_OP_WRITE) { - rbd_assert(img_request_write_test(img_request)); - } else { - rbd_assert(img_request_discard_test(img_request)); - } - snapc = img_request->snapc; - } - - rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2)); - - return __rbd_osd_req_create(rbd_dev, snapc, num_ops, - (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ? - CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request); -} - -/* - * Create a copyup osd request based on the information in the object - * request supplied. A copyup request has two or three osd ops, a - * copyup method call, potentially a hint op, and a write or truncate - * or zero op. - */ -static struct ceph_osd_request * -rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) -{ - struct rbd_img_request *img_request; - int num_osd_ops = 3; - - rbd_assert(obj_request_img_data_test(obj_request)); - img_request = obj_request->img_request; - rbd_assert(img_request); - rbd_assert(img_request_write_test(img_request) || - img_request_discard_test(img_request)); - - if (img_request_discard_test(img_request)) - num_osd_ops = 2; - - return __rbd_osd_req_create(img_request->rbd_dev, - img_request->snapc, num_osd_ops, - CEPH_OSD_FLAG_WRITE, obj_request); -} - static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) { ceph_osdc_put_request(osd_req); } -static struct rbd_obj_request * -rbd_obj_request_create(enum obj_request_type type) +static struct rbd_obj_request *rbd_obj_request_create(void) { struct rbd_obj_request *obj_request; - rbd_assert(obj_request_type_valid(type)); - obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO); if (!obj_request) return NULL; - obj_request->which = BAD_WHICH; - obj_request->type = type; - INIT_LIST_HEAD(&obj_request->links); + ceph_object_extent_init(&obj_request->ex); kref_init(&obj_request->kref); dout("%s %p\n", __func__, obj_request); @@ -2011,32 +1494,34 @@ rbd_obj_request_create(enum obj_request_type type) static void rbd_obj_request_destroy(struct kref *kref) { struct rbd_obj_request *obj_request; + u32 i; obj_request = container_of(kref, struct rbd_obj_request, kref); dout("%s: obj %p\n", __func__, obj_request); - rbd_assert(obj_request->img_request == NULL); - rbd_assert(obj_request->which == BAD_WHICH); - if (obj_request->osd_req) rbd_osd_req_destroy(obj_request->osd_req); - rbd_assert(obj_request_type_valid(obj_request->type)); - switch (obj_request->type) { + switch (obj_request->img_request->data_type) { case OBJ_REQUEST_NODATA: - break; /* Nothing to do */ case OBJ_REQUEST_BIO: - if (obj_request->bio_list) - bio_chain_put(obj_request->bio_list); - break; - case OBJ_REQUEST_PAGES: - /* img_data requests don't own their page array */ - if (obj_request->pages && - !obj_request_img_data_test(obj_request)) - ceph_release_page_vector(obj_request->pages, - obj_request->page_count); + case OBJ_REQUEST_BVECS: + break; /* Nothing to do */ + case OBJ_REQUEST_OWN_BVECS: + kfree(obj_request->bvec_pos.bvecs); break; + default: + rbd_assert(0); + } + + kfree(obj_request->img_extents); + if (obj_request->copyup_bvecs) { + for (i = 0; i < obj_request->copyup_bvec_count; i++) { + if (obj_request->copyup_bvecs[i].bv_page) + __free_page(obj_request->copyup_bvecs[i].bv_page); + } + kfree(obj_request->copyup_bvecs); } kmem_cache_free(rbd_obj_request_cache, obj_request); @@ -2111,7 +1596,6 @@ static bool rbd_dev_parent_get(struct rbd_device *rbd_dev) */ static struct rbd_img_request *rbd_img_request_create( struct rbd_device *rbd_dev, - u64 offset, u64 length, enum obj_operation_type op_type, struct ceph_snap_context *snapc) { @@ -2122,27 +1606,21 @@ static struct rbd_img_request *rbd_img_request_create( return NULL; img_request->rbd_dev = rbd_dev; - img_request->offset = offset; - img_request->length = length; - if (op_type == OBJ_OP_DISCARD) { - img_request_discard_set(img_request); - img_request->snapc = snapc; - } else if (op_type == OBJ_OP_WRITE) { - img_request_write_set(img_request); - img_request->snapc = snapc; - } else { + img_request->op_type = op_type; + if (!rbd_img_is_write(img_request)) img_request->snap_id = rbd_dev->spec->snap_id; - } + else + img_request->snapc = snapc; + if (rbd_dev_parent_get(rbd_dev)) img_request_layered_set(img_request); spin_lock_init(&img_request->completion_lock); - INIT_LIST_HEAD(&img_request->obj_requests); + INIT_LIST_HEAD(&img_request->object_extents); kref_init(&img_request->kref); - dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev, - obj_op_name(op_type), offset, length, img_request); - + dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev, + obj_op_name(op_type), img_request); return img_request; } @@ -2165,829 +1643,934 @@ static void rbd_img_request_destroy(struct kref *kref) rbd_dev_parent_put(img_request->rbd_dev); } - if (img_request_write_test(img_request) || - img_request_discard_test(img_request)) + if (rbd_img_is_write(img_request)) ceph_put_snap_context(img_request->snapc); kmem_cache_free(rbd_img_request_cache, img_request); } -static struct rbd_img_request *rbd_parent_request_create( - struct rbd_obj_request *obj_request, - u64 img_offset, u64 length) +static void prune_extents(struct ceph_file_extent *img_extents, + u32 *num_img_extents, u64 overlap) { - struct rbd_img_request *parent_request; - struct rbd_device *rbd_dev; + u32 cnt = *num_img_extents; - rbd_assert(obj_request->img_request); - rbd_dev = obj_request->img_request->rbd_dev; + /* drop extents completely beyond the overlap */ + while (cnt && img_extents[cnt - 1].fe_off >= overlap) + cnt--; - parent_request = rbd_img_request_create(rbd_dev->parent, img_offset, - length, OBJ_OP_READ, NULL); - if (!parent_request) - return NULL; + if (cnt) { + struct ceph_file_extent *ex = &img_extents[cnt - 1]; - img_request_child_set(parent_request); - rbd_obj_request_get(obj_request); - parent_request->obj_request = obj_request; + /* trim final overlapping extent */ + if (ex->fe_off + ex->fe_len > overlap) + ex->fe_len = overlap - ex->fe_off; + } - return parent_request; + *num_img_extents = cnt; } -static void rbd_parent_request_destroy(struct kref *kref) +/* + * Determine the byte range(s) covered by either just the object extent + * or the entire object in the parent image. + */ +static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req, + bool entire) { - struct rbd_img_request *parent_request; - struct rbd_obj_request *orig_request; + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + int ret; - parent_request = container_of(kref, struct rbd_img_request, kref); - orig_request = parent_request->obj_request; + if (!rbd_dev->parent_overlap) + return 0; - parent_request->obj_request = NULL; - rbd_obj_request_put(orig_request); - img_request_child_clear(parent_request); + ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno, + entire ? 0 : obj_req->ex.oe_off, + entire ? rbd_dev->layout.object_size : + obj_req->ex.oe_len, + &obj_req->img_extents, + &obj_req->num_img_extents); + if (ret) + return ret; - rbd_img_request_destroy(kref); + prune_extents(obj_req->img_extents, &obj_req->num_img_extents, + rbd_dev->parent_overlap); + return 0; } -static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request) +static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which) { - struct rbd_img_request *img_request; - unsigned int xferred; - int result; - bool more; - - rbd_assert(obj_request_img_data_test(obj_request)); - img_request = obj_request->img_request; - - rbd_assert(obj_request->xferred <= (u64)UINT_MAX); - xferred = (unsigned int)obj_request->xferred; - result = obj_request->result; - if (result) { - struct rbd_device *rbd_dev = img_request->rbd_dev; - enum obj_operation_type op_type; - - if (img_request_discard_test(img_request)) - op_type = OBJ_OP_DISCARD; - else if (img_request_write_test(img_request)) - op_type = OBJ_OP_WRITE; - else - op_type = OBJ_OP_READ; - - rbd_warn(rbd_dev, "%s %llx at %llx (%llx)", - obj_op_name(op_type), obj_request->length, - obj_request->img_offset, obj_request->offset); - rbd_warn(rbd_dev, " result %d xferred %x", - result, xferred); - if (!img_request->result) - img_request->result = result; - /* - * Need to end I/O on the entire obj_request worth of - * bytes in case of error. - */ - xferred = obj_request->length; + switch (obj_req->img_request->data_type) { + case OBJ_REQUEST_BIO: + osd_req_op_extent_osd_data_bio(obj_req->osd_req, which, + &obj_req->bio_pos, + obj_req->ex.oe_len); + break; + case OBJ_REQUEST_BVECS: + case OBJ_REQUEST_OWN_BVECS: + rbd_assert(obj_req->bvec_pos.iter.bi_size == + obj_req->ex.oe_len); + rbd_assert(obj_req->bvec_idx == obj_req->bvec_count); + osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which, + &obj_req->bvec_pos); + break; + default: + rbd_assert(0); } +} - if (img_request_child_test(img_request)) { - rbd_assert(img_request->obj_request != NULL); - more = obj_request->which < img_request->obj_request_count - 1; - } else { - blk_status_t status = errno_to_blk_status(result); +static int rbd_obj_setup_read(struct rbd_obj_request *obj_req) +{ + obj_req->osd_req = rbd_osd_req_create(obj_req, 1); + if (!obj_req->osd_req) + return -ENOMEM; - rbd_assert(img_request->rq != NULL); + osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ, + obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); + rbd_osd_req_setup_data(obj_req, 0); - more = blk_update_request(img_request->rq, status, xferred); - if (!more) - __blk_mq_end_request(img_request->rq, status); - } + rbd_osd_req_format_read(obj_req); + return 0; +} + +static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req, + unsigned int which) +{ + struct page **pages; - return more; + /* + * The response data for a STAT call consists of: + * le64 length; + * struct { + * le32 tv_sec; + * le32 tv_nsec; + * } mtime; + */ + pages = ceph_alloc_page_vector(1, GFP_NOIO); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0); + osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages, + 8 + sizeof(struct ceph_timespec), + 0, false, true); + return 0; } -static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) +static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req, + unsigned int which) { - struct rbd_img_request *img_request; - u32 which = obj_request->which; - bool more = true; + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + u16 opcode; - rbd_assert(obj_request_img_data_test(obj_request)); - img_request = obj_request->img_request; + osd_req_op_alloc_hint_init(obj_req->osd_req, which++, + rbd_dev->layout.object_size, + rbd_dev->layout.object_size); - dout("%s: img %p obj %p\n", __func__, img_request, obj_request); - rbd_assert(img_request != NULL); - rbd_assert(img_request->obj_request_count > 0); - rbd_assert(which != BAD_WHICH); - rbd_assert(which < img_request->obj_request_count); + if (rbd_obj_is_entire(obj_req)) + opcode = CEPH_OSD_OP_WRITEFULL; + else + opcode = CEPH_OSD_OP_WRITE; - spin_lock_irq(&img_request->completion_lock); - if (which != img_request->next_completion) - goto out; + osd_req_op_extent_init(obj_req->osd_req, which, opcode, + obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); + rbd_osd_req_setup_data(obj_req, which++); + + rbd_assert(which == obj_req->osd_req->r_num_ops); + rbd_osd_req_format_write(obj_req); +} - for_each_obj_request_from(img_request, obj_request) { - rbd_assert(more); - rbd_assert(which < img_request->obj_request_count); +static int rbd_obj_setup_write(struct rbd_obj_request *obj_req) +{ + unsigned int num_osd_ops, which = 0; + int ret; - if (!obj_request_done_test(obj_request)) - break; - more = rbd_img_obj_end_request(obj_request); - which++; + /* reverse map the entire object onto the parent */ + ret = rbd_obj_calc_img_extents(obj_req, true); + if (ret) + return ret; + + if (obj_req->num_img_extents) { + obj_req->write_state = RBD_OBJ_WRITE_GUARD; + num_osd_ops = 3; /* stat + setallochint + write/writefull */ + } else { + obj_req->write_state = RBD_OBJ_WRITE_FLAT; + num_osd_ops = 2; /* setallochint + write/writefull */ } - rbd_assert(more ^ (which == img_request->obj_request_count)); - img_request->next_completion = which; -out: - spin_unlock_irq(&img_request->completion_lock); - rbd_img_request_put(img_request); + obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); + if (!obj_req->osd_req) + return -ENOMEM; - if (!more) - rbd_img_request_complete(img_request); + if (obj_req->num_img_extents) { + ret = __rbd_obj_setup_stat(obj_req, which++); + if (ret) + return ret; + } + + __rbd_obj_setup_write(obj_req, which); + return 0; } -/* - * Add individual osd ops to the given ceph_osd_request and prepare - * them for submission. num_ops is the current number of - * osd operations already to the object request. - */ -static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request, - struct ceph_osd_request *osd_request, - enum obj_operation_type op_type, - unsigned int num_ops) -{ - struct rbd_img_request *img_request = obj_request->img_request; - struct rbd_device *rbd_dev = img_request->rbd_dev; - u64 object_size = rbd_obj_bytes(&rbd_dev->header); - u64 offset = obj_request->offset; - u64 length = obj_request->length; - u64 img_end; +static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req, + unsigned int which) +{ u16 opcode; - if (op_type == OBJ_OP_DISCARD) { - if (!offset && length == object_size && - (!img_request_layered_test(img_request) || - !obj_request_overlaps_parent(obj_request))) { - opcode = CEPH_OSD_OP_DELETE; - } else if ((offset + length == object_size)) { + if (rbd_obj_is_entire(obj_req)) { + if (obj_req->num_img_extents) { + osd_req_op_init(obj_req->osd_req, which++, + CEPH_OSD_OP_CREATE, 0); opcode = CEPH_OSD_OP_TRUNCATE; } else { - down_read(&rbd_dev->header_rwsem); - img_end = rbd_dev->header.image_size; - up_read(&rbd_dev->header_rwsem); - - if (obj_request->img_offset + length == img_end) - opcode = CEPH_OSD_OP_TRUNCATE; - else - opcode = CEPH_OSD_OP_ZERO; + osd_req_op_init(obj_req->osd_req, which++, + CEPH_OSD_OP_DELETE, 0); + opcode = 0; } - } else if (op_type == OBJ_OP_WRITE) { - if (!offset && length == object_size) - opcode = CEPH_OSD_OP_WRITEFULL; - else - opcode = CEPH_OSD_OP_WRITE; - osd_req_op_alloc_hint_init(osd_request, num_ops, - object_size, object_size); - num_ops++; + } else if (rbd_obj_is_tail(obj_req)) { + opcode = CEPH_OSD_OP_TRUNCATE; } else { - opcode = CEPH_OSD_OP_READ; + opcode = CEPH_OSD_OP_ZERO; } - if (opcode == CEPH_OSD_OP_DELETE) - osd_req_op_init(osd_request, num_ops, opcode, 0); - else - osd_req_op_extent_init(osd_request, num_ops, opcode, - offset, length, 0, 0); - - if (obj_request->type == OBJ_REQUEST_BIO) - osd_req_op_extent_osd_data_bio(osd_request, num_ops, - obj_request->bio_list, length); - else if (obj_request->type == OBJ_REQUEST_PAGES) - osd_req_op_extent_osd_data_pages(osd_request, num_ops, - obj_request->pages, length, - offset & ~PAGE_MASK, false, false); - - /* Discards are also writes */ - if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) - rbd_osd_req_format_write(obj_request); - else - rbd_osd_req_format_read(obj_request); + if (opcode) + osd_req_op_extent_init(obj_req->osd_req, which++, opcode, + obj_req->ex.oe_off, obj_req->ex.oe_len, + 0, 0); + + rbd_assert(which == obj_req->osd_req->r_num_ops); + rbd_osd_req_format_write(obj_req); } -/* - * Split up an image request into one or more object requests, each - * to a different object. The "type" parameter indicates whether - * "data_desc" is the pointer to the head of a list of bio - * structures, or the base of a page array. In either case this - * function assumes data_desc describes memory sufficient to hold - * all data described by the image request. - */ -static int rbd_img_request_fill(struct rbd_img_request *img_request, - enum obj_request_type type, - void *data_desc) +static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req) { - struct rbd_device *rbd_dev = img_request->rbd_dev; - struct rbd_obj_request *obj_request = NULL; - struct rbd_obj_request *next_obj_request; - struct bio *bio_list = NULL; - unsigned int bio_offset = 0; - struct page **pages = NULL; - enum obj_operation_type op_type; - u64 img_offset; - u64 resid; - - dout("%s: img %p type %d data_desc %p\n", __func__, img_request, - (int)type, data_desc); + unsigned int num_osd_ops, which = 0; + int ret; - img_offset = img_request->offset; - resid = img_request->length; - rbd_assert(resid > 0); - op_type = rbd_img_request_op_type(img_request); + /* reverse map the entire object onto the parent */ + ret = rbd_obj_calc_img_extents(obj_req, true); + if (ret) + return ret; - if (type == OBJ_REQUEST_BIO) { - bio_list = data_desc; - rbd_assert(img_offset == - bio_list->bi_iter.bi_sector << SECTOR_SHIFT); - } else if (type == OBJ_REQUEST_PAGES) { - pages = data_desc; + if (rbd_obj_is_entire(obj_req)) { + obj_req->write_state = RBD_OBJ_WRITE_FLAT; + if (obj_req->num_img_extents) + num_osd_ops = 2; /* create + truncate */ + else + num_osd_ops = 1; /* delete */ + } else { + if (obj_req->num_img_extents) { + obj_req->write_state = RBD_OBJ_WRITE_GUARD; + num_osd_ops = 2; /* stat + truncate/zero */ + } else { + obj_req->write_state = RBD_OBJ_WRITE_FLAT; + num_osd_ops = 1; /* truncate/zero */ + } } - while (resid) { - struct ceph_osd_request *osd_req; - u64 object_no = img_offset >> rbd_dev->header.obj_order; - u64 offset = rbd_segment_offset(rbd_dev, img_offset); - u64 length = rbd_segment_length(rbd_dev, img_offset, resid); - - obj_request = rbd_obj_request_create(type); - if (!obj_request) - goto out_unwind; - - obj_request->object_no = object_no; - obj_request->offset = offset; - obj_request->length = length; - - /* - * set obj_request->img_request before creating the - * osd_request so that it gets the right snapc - */ - rbd_img_obj_request_add(img_request, obj_request); - - if (type == OBJ_REQUEST_BIO) { - unsigned int clone_size; - - rbd_assert(length <= (u64)UINT_MAX); - clone_size = (unsigned int)length; - obj_request->bio_list = - bio_chain_clone_range(&bio_list, - &bio_offset, - clone_size, - GFP_NOIO); - if (!obj_request->bio_list) - goto out_unwind; - } else if (type == OBJ_REQUEST_PAGES) { - unsigned int page_count; - - obj_request->pages = pages; - page_count = (u32)calc_pages_for(offset, length); - obj_request->page_count = page_count; - if ((offset + length) & ~PAGE_MASK) - page_count--; /* more on last page */ - pages += page_count; - } + obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); + if (!obj_req->osd_req) + return -ENOMEM; - osd_req = rbd_osd_req_create(rbd_dev, op_type, - (op_type == OBJ_OP_WRITE) ? 2 : 1, - obj_request); - if (!osd_req) - goto out_unwind; + if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) { + ret = __rbd_obj_setup_stat(obj_req, which++); + if (ret) + return ret; + } - obj_request->osd_req = osd_req; - obj_request->callback = rbd_img_obj_callback; - obj_request->img_offset = img_offset; + __rbd_obj_setup_discard(obj_req, which); + return 0; +} - rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0); +/* + * For each object request in @img_req, allocate an OSD request, add + * individual OSD ops and prepare them for submission. The number of + * OSD ops depends on op_type and the overlap point (if any). + */ +static int __rbd_img_fill_request(struct rbd_img_request *img_req) +{ + struct rbd_obj_request *obj_req; + int ret; - img_offset += length; - resid -= length; + for_each_obj_request(img_req, obj_req) { + switch (img_req->op_type) { + case OBJ_OP_READ: + ret = rbd_obj_setup_read(obj_req); + break; + case OBJ_OP_WRITE: + ret = rbd_obj_setup_write(obj_req); + break; + case OBJ_OP_DISCARD: + ret = rbd_obj_setup_discard(obj_req); + break; + default: + rbd_assert(0); + } + if (ret) + return ret; } return 0; +} -out_unwind: - for_each_obj_request_safe(img_request, obj_request, next_obj_request) - rbd_img_obj_request_del(img_request, obj_request); +union rbd_img_fill_iter { + struct ceph_bio_iter bio_iter; + struct ceph_bvec_iter bvec_iter; +}; - return -ENOMEM; -} +struct rbd_img_fill_ctx { + enum obj_request_type pos_type; + union rbd_img_fill_iter *pos; + union rbd_img_fill_iter iter; + ceph_object_extent_fn_t set_pos_fn; + ceph_object_extent_fn_t count_fn; + ceph_object_extent_fn_t copy_fn; +}; -static void -rbd_osd_copyup_callback(struct rbd_obj_request *obj_request) +static struct ceph_object_extent *alloc_object_extent(void *arg) { - struct rbd_img_request *img_request; - struct rbd_device *rbd_dev; - struct page **pages; - u32 page_count; + struct rbd_img_request *img_req = arg; + struct rbd_obj_request *obj_req; - dout("%s: obj %p\n", __func__, obj_request); + obj_req = rbd_obj_request_create(); + if (!obj_req) + return NULL; - rbd_assert(obj_request->type == OBJ_REQUEST_BIO || - obj_request->type == OBJ_REQUEST_NODATA); - rbd_assert(obj_request_img_data_test(obj_request)); - img_request = obj_request->img_request; - rbd_assert(img_request); + rbd_img_obj_request_add(img_req, obj_req); + return &obj_req->ex; +} - rbd_dev = img_request->rbd_dev; - rbd_assert(rbd_dev); +/* + * While su != os && sc == 1 is technically not fancy (it's the same + * layout as su == os && sc == 1), we can't use the nocopy path for it + * because ->set_pos_fn() should be called only once per object. + * ceph_file_to_extents() invokes action_fn once per stripe unit, so + * treat su != os && sc == 1 as fancy. + */ +static bool rbd_layout_is_fancy(struct ceph_file_layout *l) +{ + return l->stripe_unit != l->object_size; +} - pages = obj_request->copyup_pages; - rbd_assert(pages != NULL); - obj_request->copyup_pages = NULL; - page_count = obj_request->copyup_page_count; - rbd_assert(page_count); - obj_request->copyup_page_count = 0; - ceph_release_page_vector(pages, page_count); +static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req, + struct ceph_file_extent *img_extents, + u32 num_img_extents, + struct rbd_img_fill_ctx *fctx) +{ + u32 i; + int ret; + + img_req->data_type = fctx->pos_type; /* - * We want the transfer count to reflect the size of the - * original write request. There is no such thing as a - * successful short write, so if the request was successful - * we can just set it to the originally-requested length. + * Create object requests and set each object request's starting + * position in the provided bio (list) or bio_vec array. */ - if (!obj_request->result) - obj_request->xferred = obj_request->length; + fctx->iter = *fctx->pos; + for (i = 0; i < num_img_extents; i++) { + ret = ceph_file_to_extents(&img_req->rbd_dev->layout, + img_extents[i].fe_off, + img_extents[i].fe_len, + &img_req->object_extents, + alloc_object_extent, img_req, + fctx->set_pos_fn, &fctx->iter); + if (ret) + return ret; + } - obj_request_done_set(obj_request); + return __rbd_img_fill_request(img_req); } -static void -rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) +/* + * Map a list of image extents to a list of object extents, create the + * corresponding object requests (normally each to a different object, + * but not always) and add them to @img_req. For each object request, + * set up its data descriptor to point to the corresponding chunk(s) of + * @fctx->pos data buffer. + * + * Because ceph_file_to_extents() will merge adjacent object extents + * together, each object request's data descriptor may point to multiple + * different chunks of @fctx->pos data buffer. + * + * @fctx->pos data buffer is assumed to be large enough. + */ +static int rbd_img_fill_request(struct rbd_img_request *img_req, + struct ceph_file_extent *img_extents, + u32 num_img_extents, + struct rbd_img_fill_ctx *fctx) { - struct rbd_obj_request *orig_request; - struct ceph_osd_request *osd_req; - struct rbd_device *rbd_dev; - struct page **pages; - enum obj_operation_type op_type; - u32 page_count; - int img_result; - u64 parent_length; - - rbd_assert(img_request_child_test(img_request)); - - /* First get what we need from the image request */ - - pages = img_request->copyup_pages; - rbd_assert(pages != NULL); - img_request->copyup_pages = NULL; - page_count = img_request->copyup_page_count; - rbd_assert(page_count); - img_request->copyup_page_count = 0; - - orig_request = img_request->obj_request; - rbd_assert(orig_request != NULL); - rbd_assert(obj_request_type_valid(orig_request->type)); - img_result = img_request->result; - parent_length = img_request->length; - rbd_assert(img_result || parent_length == img_request->xferred); - rbd_img_request_put(img_request); + struct rbd_device *rbd_dev = img_req->rbd_dev; + struct rbd_obj_request *obj_req; + u32 i; + int ret; - rbd_assert(orig_request->img_request); - rbd_dev = orig_request->img_request->rbd_dev; - rbd_assert(rbd_dev); + if (fctx->pos_type == OBJ_REQUEST_NODATA || + !rbd_layout_is_fancy(&rbd_dev->layout)) + return rbd_img_fill_request_nocopy(img_req, img_extents, + num_img_extents, fctx); + + img_req->data_type = OBJ_REQUEST_OWN_BVECS; /* - * If the overlap has become 0 (most likely because the - * image has been flattened) we need to free the pages - * and re-submit the original write request. + * Create object requests and determine ->bvec_count for each object + * request. Note that ->bvec_count sum over all object requests may + * be greater than the number of bio_vecs in the provided bio (list) + * or bio_vec array because when mapped, those bio_vecs can straddle + * stripe unit boundaries. */ - if (!rbd_dev->parent_overlap) { - ceph_release_page_vector(pages, page_count); - rbd_obj_request_submit(orig_request); - return; + fctx->iter = *fctx->pos; + for (i = 0; i < num_img_extents; i++) { + ret = ceph_file_to_extents(&rbd_dev->layout, + img_extents[i].fe_off, + img_extents[i].fe_len, + &img_req->object_extents, + alloc_object_extent, img_req, + fctx->count_fn, &fctx->iter); + if (ret) + return ret; } - if (img_result) - goto out_err; + for_each_obj_request(img_req, obj_req) { + obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count, + sizeof(*obj_req->bvec_pos.bvecs), + GFP_NOIO); + if (!obj_req->bvec_pos.bvecs) + return -ENOMEM; + } /* - * The original osd request is of no use to use any more. - * We need a new one that can hold the three ops in a copyup - * request. Allocate the new copyup osd request for the - * original request, and release the old one. + * Fill in each object request's private bio_vec array, splitting and + * rearranging the provided bio_vecs in stripe unit chunks as needed. */ - img_result = -ENOMEM; - osd_req = rbd_osd_req_create_copyup(orig_request); - if (!osd_req) - goto out_err; - rbd_osd_req_destroy(orig_request->osd_req); - orig_request->osd_req = osd_req; - orig_request->copyup_pages = pages; - orig_request->copyup_page_count = page_count; + fctx->iter = *fctx->pos; + for (i = 0; i < num_img_extents; i++) { + ret = ceph_iterate_extents(&rbd_dev->layout, + img_extents[i].fe_off, + img_extents[i].fe_len, + &img_req->object_extents, + fctx->copy_fn, &fctx->iter); + if (ret) + return ret; + } - /* Initialize the copyup op */ + return __rbd_img_fill_request(img_req); +} - osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup"); - osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0, - false, false); +static int rbd_img_fill_nodata(struct rbd_img_request *img_req, + u64 off, u64 len) +{ + struct ceph_file_extent ex = { off, len }; + union rbd_img_fill_iter dummy; + struct rbd_img_fill_ctx fctx = { + .pos_type = OBJ_REQUEST_NODATA, + .pos = &dummy, + }; - /* Add the other op(s) */ + return rbd_img_fill_request(img_req, &ex, 1, &fctx); +} - op_type = rbd_img_request_op_type(orig_request->img_request); - rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1); +static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg) +{ + struct rbd_obj_request *obj_req = + container_of(ex, struct rbd_obj_request, ex); + struct ceph_bio_iter *it = arg; - /* All set, send it off. */ + dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); + obj_req->bio_pos = *it; + ceph_bio_iter_advance(it, bytes); +} - rbd_obj_request_submit(orig_request); - return; +static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) +{ + struct rbd_obj_request *obj_req = + container_of(ex, struct rbd_obj_request, ex); + struct ceph_bio_iter *it = arg; + + dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); + ceph_bio_iter_advance_step(it, bytes, ({ + obj_req->bvec_count++; + })); -out_err: - ceph_release_page_vector(pages, page_count); - rbd_obj_request_error(orig_request, img_result); } -/* - * Read from the parent image the range of data that covers the - * entire target of the given object request. This is used for - * satisfying a layered image write request when the target of an - * object request from the image request does not exist. - * - * A page array big enough to hold the returned data is allocated - * and supplied to rbd_img_request_fill() as the "data descriptor." - * When the read completes, this page array will be transferred to - * the original object request for the copyup operation. - * - * If an error occurs, it is recorded as the result of the original - * object request in rbd_img_obj_exists_callback(). - */ -static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) -{ - struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; - struct rbd_img_request *parent_request = NULL; - u64 img_offset; - u64 length; - struct page **pages = NULL; - u32 page_count; - int result; +static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) +{ + struct rbd_obj_request *obj_req = + container_of(ex, struct rbd_obj_request, ex); + struct ceph_bio_iter *it = arg; - rbd_assert(rbd_dev->parent != NULL); + dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes); + ceph_bio_iter_advance_step(it, bytes, ({ + obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv; + obj_req->bvec_pos.iter.bi_size += bv.bv_len; + })); +} - /* - * Determine the byte range covered by the object in the - * child image to which the original request was to be sent. - */ - img_offset = obj_request->img_offset - obj_request->offset; - length = rbd_obj_bytes(&rbd_dev->header); +static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req, + struct ceph_file_extent *img_extents, + u32 num_img_extents, + struct ceph_bio_iter *bio_pos) +{ + struct rbd_img_fill_ctx fctx = { + .pos_type = OBJ_REQUEST_BIO, + .pos = (union rbd_img_fill_iter *)bio_pos, + .set_pos_fn = set_bio_pos, + .count_fn = count_bio_bvecs, + .copy_fn = copy_bio_bvecs, + }; - /* - * There is no defined parent data beyond the parent - * overlap, so limit what we read at that boundary if - * necessary. - */ - if (img_offset + length > rbd_dev->parent_overlap) { - rbd_assert(img_offset < rbd_dev->parent_overlap); - length = rbd_dev->parent_overlap - img_offset; - } + return rbd_img_fill_request(img_req, img_extents, num_img_extents, + &fctx); +} - /* - * Allocate a page array big enough to receive the data read - * from the parent. - */ - page_count = (u32)calc_pages_for(0, length); - pages = ceph_alloc_page_vector(page_count, GFP_NOIO); - if (IS_ERR(pages)) { - result = PTR_ERR(pages); - pages = NULL; - goto out_err; - } +static int rbd_img_fill_from_bio(struct rbd_img_request *img_req, + u64 off, u64 len, struct bio *bio) +{ + struct ceph_file_extent ex = { off, len }; + struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter }; - result = -ENOMEM; - parent_request = rbd_parent_request_create(obj_request, - img_offset, length); - if (!parent_request) - goto out_err; + return __rbd_img_fill_from_bio(img_req, &ex, 1, &it); +} - result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages); - if (result) - goto out_err; +static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg) +{ + struct rbd_obj_request *obj_req = + container_of(ex, struct rbd_obj_request, ex); + struct ceph_bvec_iter *it = arg; - parent_request->copyup_pages = pages; - parent_request->copyup_page_count = page_count; - parent_request->callback = rbd_img_obj_parent_read_full_callback; + obj_req->bvec_pos = *it; + ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes); + ceph_bvec_iter_advance(it, bytes); +} - result = rbd_img_request_submit(parent_request); - if (!result) - return 0; +static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) +{ + struct rbd_obj_request *obj_req = + container_of(ex, struct rbd_obj_request, ex); + struct ceph_bvec_iter *it = arg; - parent_request->copyup_pages = NULL; - parent_request->copyup_page_count = 0; -out_err: - if (pages) - ceph_release_page_vector(pages, page_count); - if (parent_request) - rbd_img_request_put(parent_request); - return result; + ceph_bvec_iter_advance_step(it, bytes, ({ + obj_req->bvec_count++; + })); } -static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) +static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg) { - struct rbd_obj_request *orig_request; - struct rbd_device *rbd_dev; - int result; + struct rbd_obj_request *obj_req = + container_of(ex, struct rbd_obj_request, ex); + struct ceph_bvec_iter *it = arg; - rbd_assert(!obj_request_img_data_test(obj_request)); + ceph_bvec_iter_advance_step(it, bytes, ({ + obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv; + obj_req->bvec_pos.iter.bi_size += bv.bv_len; + })); +} - /* - * All we need from the object request is the original - * request and the result of the STAT op. Grab those, then - * we're done with the request. - */ - orig_request = obj_request->obj_request; - obj_request->obj_request = NULL; - rbd_obj_request_put(orig_request); - rbd_assert(orig_request); - rbd_assert(orig_request->img_request); - - result = obj_request->result; - obj_request->result = 0; - - dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__, - obj_request, orig_request, result, - obj_request->xferred, obj_request->length); - rbd_obj_request_put(obj_request); +static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req, + struct ceph_file_extent *img_extents, + u32 num_img_extents, + struct ceph_bvec_iter *bvec_pos) +{ + struct rbd_img_fill_ctx fctx = { + .pos_type = OBJ_REQUEST_BVECS, + .pos = (union rbd_img_fill_iter *)bvec_pos, + .set_pos_fn = set_bvec_pos, + .count_fn = count_bvecs, + .copy_fn = copy_bvecs, + }; - /* - * If the overlap has become 0 (most likely because the - * image has been flattened) we need to re-submit the - * original request. - */ - rbd_dev = orig_request->img_request->rbd_dev; - if (!rbd_dev->parent_overlap) { - rbd_obj_request_submit(orig_request); - return; - } + return rbd_img_fill_request(img_req, img_extents, num_img_extents, + &fctx); +} - /* - * Our only purpose here is to determine whether the object - * exists, and we don't want to treat the non-existence as - * an error. If something else comes back, transfer the - * error to the original request and complete it now. - */ - if (!result) { - obj_request_existence_set(orig_request, true); - } else if (result == -ENOENT) { - obj_request_existence_set(orig_request, false); - } else { - goto fail_orig_request; - } +static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req, + struct ceph_file_extent *img_extents, + u32 num_img_extents, + struct bio_vec *bvecs) +{ + struct ceph_bvec_iter it = { + .bvecs = bvecs, + .iter = { .bi_size = ceph_file_extents_bytes(img_extents, + num_img_extents) }, + }; - /* - * Resubmit the original request now that we have recorded - * whether the target object exists. - */ - result = rbd_img_obj_request_submit(orig_request); - if (result) - goto fail_orig_request; + return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents, + &it); +} - return; +static void rbd_img_request_submit(struct rbd_img_request *img_request) +{ + struct rbd_obj_request *obj_request; + + dout("%s: img %p\n", __func__, img_request); + + rbd_img_request_get(img_request); + for_each_obj_request(img_request, obj_request) + rbd_obj_request_submit(obj_request); -fail_orig_request: - rbd_obj_request_error(orig_request, result); + rbd_img_request_put(img_request); } -static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) +static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req) { - struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; - struct rbd_obj_request *stat_request; - struct page **pages; - u32 page_count; - size_t size; + struct rbd_img_request *img_req = obj_req->img_request; + struct rbd_img_request *child_img_req; int ret; - stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES); - if (!stat_request) + child_img_req = rbd_img_request_create(img_req->rbd_dev->parent, + OBJ_OP_READ, NULL); + if (!child_img_req) return -ENOMEM; - stat_request->object_no = obj_request->object_no; + __set_bit(IMG_REQ_CHILD, &child_img_req->flags); + child_img_req->obj_request = obj_req; - stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, - stat_request); - if (!stat_request->osd_req) { - ret = -ENOMEM; - goto fail_stat_request; + if (!rbd_img_is_write(img_req)) { + switch (img_req->data_type) { + case OBJ_REQUEST_BIO: + ret = __rbd_img_fill_from_bio(child_img_req, + obj_req->img_extents, + obj_req->num_img_extents, + &obj_req->bio_pos); + break; + case OBJ_REQUEST_BVECS: + case OBJ_REQUEST_OWN_BVECS: + ret = __rbd_img_fill_from_bvecs(child_img_req, + obj_req->img_extents, + obj_req->num_img_extents, + &obj_req->bvec_pos); + break; + default: + rbd_assert(0); + } + } else { + ret = rbd_img_fill_from_bvecs(child_img_req, + obj_req->img_extents, + obj_req->num_img_extents, + obj_req->copyup_bvecs); + } + if (ret) { + rbd_img_request_put(child_img_req); + return ret; + } + + rbd_img_request_submit(child_img_req); + return 0; +} + +static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + int ret; + + if (obj_req->result == -ENOENT && + rbd_dev->parent_overlap && !obj_req->tried_parent) { + /* reverse map this object extent onto the parent */ + ret = rbd_obj_calc_img_extents(obj_req, false); + if (ret) { + obj_req->result = ret; + return true; + } + + if (obj_req->num_img_extents) { + obj_req->tried_parent = true; + ret = rbd_obj_read_from_parent(obj_req); + if (ret) { + obj_req->result = ret; + return true; + } + return false; + } } /* - * The response data for a STAT call consists of: - * le64 length; - * struct { - * le32 tv_sec; - * le32 tv_nsec; - * } mtime; + * -ENOENT means a hole in the image -- zero-fill the entire + * length of the request. A short read also implies zero-fill + * to the end of the request. In both cases we update xferred + * count to indicate the whole request was satisfied. */ - size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32); - page_count = (u32)calc_pages_for(0, size); - pages = ceph_alloc_page_vector(page_count, GFP_NOIO); - if (IS_ERR(pages)) { - ret = PTR_ERR(pages); - goto fail_stat_request; + if (obj_req->result == -ENOENT || + (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) { + rbd_assert(!obj_req->xferred || !obj_req->result); + rbd_obj_zero_range(obj_req, obj_req->xferred, + obj_req->ex.oe_len - obj_req->xferred); + obj_req->result = 0; + obj_req->xferred = obj_req->ex.oe_len; } - osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0); - osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, - false, false); - - rbd_obj_request_get(obj_request); - stat_request->obj_request = obj_request; - stat_request->pages = pages; - stat_request->page_count = page_count; - stat_request->callback = rbd_img_obj_exists_callback; + return true; +} - rbd_obj_request_submit(stat_request); - return 0; +/* + * copyup_bvecs pages are never highmem pages + */ +static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes) +{ + struct ceph_bvec_iter it = { + .bvecs = bvecs, + .iter = { .bi_size = bytes }, + }; -fail_stat_request: - rbd_obj_request_put(stat_request); - return ret; + ceph_bvec_iter_advance_step(&it, bytes, ({ + if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0, + bv.bv_len)) + return false; + })); + return true; } -static bool img_obj_request_simple(struct rbd_obj_request *obj_request) +static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) { - struct rbd_img_request *img_request = obj_request->img_request; - struct rbd_device *rbd_dev = img_request->rbd_dev; + unsigned int num_osd_ops = obj_req->osd_req->r_num_ops; - /* Reads */ - if (!img_request_write_test(img_request) && - !img_request_discard_test(img_request)) - return true; - - /* Non-layered writes */ - if (!img_request_layered_test(img_request)) - return true; + dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); + rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT); + rbd_osd_req_destroy(obj_req->osd_req); /* - * Layered writes outside of the parent overlap range don't - * share any data with the parent. + * Create a copyup request with the same number of OSD ops as + * the original request. The original request was stat + op(s), + * the new copyup request will be copyup + the same op(s). */ - if (!obj_request_overlaps_parent(obj_request)) - return true; + obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); + if (!obj_req->osd_req) + return -ENOMEM; /* - * Entire-object layered writes - we will overwrite whatever - * parent data there is anyway. + * Only send non-zero copyup data to save some I/O and network + * bandwidth -- zero copyup data is equivalent to the object not + * existing. */ - if (!obj_request->offset && - obj_request->length == rbd_obj_bytes(&rbd_dev->header)) - return true; + if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) { + dout("%s obj_req %p detected zeroes\n", __func__, obj_req); + bytes = 0; + } - /* - * If the object is known to already exist, its parent data has - * already been copied. - */ - if (obj_request_known_test(obj_request) && - obj_request_exists_test(obj_request)) - return true; + osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd", + "copyup"); + osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0, + obj_req->copyup_bvecs, bytes); + + switch (obj_req->img_request->op_type) { + case OBJ_OP_WRITE: + __rbd_obj_setup_write(obj_req, 1); + break; + case OBJ_OP_DISCARD: + rbd_assert(!rbd_obj_is_entire(obj_req)); + __rbd_obj_setup_discard(obj_req, 1); + break; + default: + rbd_assert(0); + } - return false; + rbd_obj_request_submit(obj_req); + return 0; } -static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) +static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap) { - rbd_assert(obj_request_img_data_test(obj_request)); - rbd_assert(obj_request_type_valid(obj_request->type)); - rbd_assert(obj_request->img_request); + u32 i; - if (img_obj_request_simple(obj_request)) { - rbd_obj_request_submit(obj_request); - return 0; - } + rbd_assert(!obj_req->copyup_bvecs); + obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap); + obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count, + sizeof(*obj_req->copyup_bvecs), + GFP_NOIO); + if (!obj_req->copyup_bvecs) + return -ENOMEM; - /* - * It's a layered write. The target object might exist but - * we may not know that yet. If we know it doesn't exist, - * start by reading the data for the full target object from - * the parent so we can use it for a copyup to the target. - */ - if (obj_request_known_test(obj_request)) - return rbd_img_obj_parent_read_full(obj_request); + for (i = 0; i < obj_req->copyup_bvec_count; i++) { + unsigned int len = min(obj_overlap, (u64)PAGE_SIZE); - /* We don't know whether the target exists. Go find out. */ + obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO); + if (!obj_req->copyup_bvecs[i].bv_page) + return -ENOMEM; + + obj_req->copyup_bvecs[i].bv_offset = 0; + obj_req->copyup_bvecs[i].bv_len = len; + obj_overlap -= len; + } - return rbd_img_obj_exists_submit(obj_request); + rbd_assert(!obj_overlap); + return 0; } -static int rbd_img_request_submit(struct rbd_img_request *img_request) +static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req) { - struct rbd_obj_request *obj_request; - struct rbd_obj_request *next_obj_request; - int ret = 0; - - dout("%s: img %p\n", __func__, img_request); + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + int ret; - rbd_img_request_get(img_request); - for_each_obj_request_safe(img_request, obj_request, next_obj_request) { - ret = rbd_img_obj_request_submit(obj_request); - if (ret) - goto out_put_ireq; + rbd_assert(obj_req->num_img_extents); + prune_extents(obj_req->img_extents, &obj_req->num_img_extents, + rbd_dev->parent_overlap); + if (!obj_req->num_img_extents) { + /* + * The overlap has become 0 (most likely because the + * image has been flattened). Use rbd_obj_issue_copyup() + * to re-submit the original write request -- the copyup + * operation itself will be a no-op, since someone must + * have populated the child object while we weren't + * looking. Move to WRITE_FLAT state as we'll be done + * with the operation once the null copyup completes. + */ + obj_req->write_state = RBD_OBJ_WRITE_FLAT; + return rbd_obj_issue_copyup(obj_req, 0); } -out_put_ireq: - rbd_img_request_put(img_request); - return ret; + ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req)); + if (ret) + return ret; + + obj_req->write_state = RBD_OBJ_WRITE_COPYUP; + return rbd_obj_read_from_parent(obj_req); } -static void rbd_img_parent_read_callback(struct rbd_img_request *img_request) +static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req) { - struct rbd_obj_request *obj_request; - struct rbd_device *rbd_dev; - u64 obj_end; - u64 img_xferred; - int img_result; + int ret; - rbd_assert(img_request_child_test(img_request)); +again: + switch (obj_req->write_state) { + case RBD_OBJ_WRITE_GUARD: + rbd_assert(!obj_req->xferred); + if (obj_req->result == -ENOENT) { + /* + * The target object doesn't exist. Read the data for + * the entire target object up to the overlap point (if + * any) from the parent, so we can use it for a copyup. + */ + ret = rbd_obj_handle_write_guard(obj_req); + if (ret) { + obj_req->result = ret; + return true; + } + return false; + } + /* fall through */ + case RBD_OBJ_WRITE_FLAT: + if (!obj_req->result) + /* + * There is no such thing as a successful short + * write -- indicate the whole request was satisfied. + */ + obj_req->xferred = obj_req->ex.oe_len; + return true; + case RBD_OBJ_WRITE_COPYUP: + obj_req->write_state = RBD_OBJ_WRITE_GUARD; + if (obj_req->result) + goto again; - /* First get what we need from the image request and release it */ + rbd_assert(obj_req->xferred); + ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred); + if (ret) { + obj_req->result = ret; + return true; + } + return false; + default: + rbd_assert(0); + } +} - obj_request = img_request->obj_request; - img_xferred = img_request->xferred; - img_result = img_request->result; - rbd_img_request_put(img_request); +/* + * Returns true if @obj_req is completed, or false otherwise. + */ +static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req) +{ + switch (obj_req->img_request->op_type) { + case OBJ_OP_READ: + return rbd_obj_handle_read(obj_req); + case OBJ_OP_WRITE: + return rbd_obj_handle_write(obj_req); + case OBJ_OP_DISCARD: + if (rbd_obj_handle_write(obj_req)) { + /* + * Hide -ENOENT from delete/truncate/zero -- discarding + * a non-existent object is not a problem. + */ + if (obj_req->result == -ENOENT) { + obj_req->result = 0; + obj_req->xferred = obj_req->ex.oe_len; + } + return true; + } + return false; + default: + rbd_assert(0); + } +} - /* - * If the overlap has become 0 (most likely because the - * image has been flattened) we need to re-submit the - * original request. - */ - rbd_assert(obj_request); - rbd_assert(obj_request->img_request); - rbd_dev = obj_request->img_request->rbd_dev; - if (!rbd_dev->parent_overlap) { - rbd_obj_request_submit(obj_request); +static void rbd_obj_end_request(struct rbd_obj_request *obj_req) +{ + struct rbd_img_request *img_req = obj_req->img_request; + + rbd_assert((!obj_req->result && + obj_req->xferred == obj_req->ex.oe_len) || + (obj_req->result < 0 && !obj_req->xferred)); + if (!obj_req->result) { + img_req->xferred += obj_req->xferred; return; } - obj_request->result = img_result; - if (obj_request->result) - goto out; + rbd_warn(img_req->rbd_dev, + "%s at objno %llu %llu~%llu result %d xferred %llu", + obj_op_name(img_req->op_type), obj_req->ex.oe_objno, + obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result, + obj_req->xferred); + if (!img_req->result) { + img_req->result = obj_req->result; + img_req->xferred = 0; + } +} - /* - * We need to zero anything beyond the parent overlap - * boundary. Since rbd_img_obj_request_read_callback() - * will zero anything beyond the end of a short read, an - * easy way to do this is to pretend the data from the - * parent came up short--ending at the overlap boundary. - */ - rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length); - obj_end = obj_request->img_offset + obj_request->length; - if (obj_end > rbd_dev->parent_overlap) { - u64 xferred = 0; +static void rbd_img_end_child_request(struct rbd_img_request *img_req) +{ + struct rbd_obj_request *obj_req = img_req->obj_request; - if (obj_request->img_offset < rbd_dev->parent_overlap) - xferred = rbd_dev->parent_overlap - - obj_request->img_offset; + rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags)); + rbd_assert((!img_req->result && + img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) || + (img_req->result < 0 && !img_req->xferred)); - obj_request->xferred = min(img_xferred, xferred); - } else { - obj_request->xferred = img_xferred; - } -out: - rbd_img_obj_request_read_callback(obj_request); - rbd_obj_request_complete(obj_request); + obj_req->result = img_req->result; + obj_req->xferred = img_req->xferred; + rbd_img_request_put(img_req); } -static void rbd_img_parent_read(struct rbd_obj_request *obj_request) +static void rbd_img_end_request(struct rbd_img_request *img_req) { - struct rbd_img_request *img_request; - int result; + rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags)); + rbd_assert((!img_req->result && + img_req->xferred == blk_rq_bytes(img_req->rq)) || + (img_req->result < 0 && !img_req->xferred)); - rbd_assert(obj_request_img_data_test(obj_request)); - rbd_assert(obj_request->img_request != NULL); - rbd_assert(obj_request->result == (s32) -ENOENT); - rbd_assert(obj_request_type_valid(obj_request->type)); + blk_mq_end_request(img_req->rq, + errno_to_blk_status(img_req->result)); + rbd_img_request_put(img_req); +} - /* rbd_read_finish(obj_request, obj_request->length); */ - img_request = rbd_parent_request_create(obj_request, - obj_request->img_offset, - obj_request->length); - result = -ENOMEM; - if (!img_request) - goto out_err; +static void rbd_obj_handle_request(struct rbd_obj_request *obj_req) +{ + struct rbd_img_request *img_req; - if (obj_request->type == OBJ_REQUEST_BIO) - result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, - obj_request->bio_list); - else - result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES, - obj_request->pages); - if (result) - goto out_err; +again: + if (!__rbd_obj_handle_request(obj_req)) + return; - img_request->callback = rbd_img_parent_read_callback; - result = rbd_img_request_submit(img_request); - if (result) - goto out_err; + img_req = obj_req->img_request; + spin_lock(&img_req->completion_lock); + rbd_obj_end_request(obj_req); + rbd_assert(img_req->pending_count); + if (--img_req->pending_count) { + spin_unlock(&img_req->completion_lock); + return; + } - return; -out_err: - if (img_request) - rbd_img_request_put(img_request); - obj_request->result = result; - obj_request->xferred = 0; - obj_request_done_set(obj_request); + spin_unlock(&img_req->completion_lock); + if (test_bit(IMG_REQ_CHILD, &img_req->flags)) { + obj_req = img_req->obj_request; + rbd_img_end_child_request(img_req); + goto again; + } + rbd_img_end_request(img_req); } static const struct rbd_client_id rbd_empty_cid; @@ -3091,8 +2674,8 @@ static int __rbd_notify_op_lock(struct rbd_device *rbd_dev, { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_client_id cid = rbd_get_cid(rbd_dev); - int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN; - char buf[buf_size]; + char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN]; + int buf_size = sizeof(buf); void *p = buf; dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op); @@ -3610,8 +3193,8 @@ static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id, u64 cookie, s32 *result) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; - int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN; - char buf[buf_size]; + char buf[4 + CEPH_ENCODING_START_BLK_LEN]; + int buf_size = sizeof(buf); int ret; if (result) { @@ -3887,7 +3470,7 @@ static void rbd_reregister_watch(struct work_struct *work) ret = rbd_dev_refresh(rbd_dev); if (ret) - rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); + rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret); } /* @@ -4070,8 +3653,7 @@ static void rbd_queue_workfn(struct work_struct *work) } } - img_request = rbd_img_request_create(rbd_dev, offset, length, op_type, - snapc); + img_request = rbd_img_request_create(rbd_dev, op_type, snapc); if (!img_request) { result = -ENOMEM; goto err_unlock; @@ -4080,18 +3662,14 @@ static void rbd_queue_workfn(struct work_struct *work) snapc = NULL; /* img_request consumes a ref */ if (op_type == OBJ_OP_DISCARD) - result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA, - NULL); + result = rbd_img_fill_nodata(img_request, offset, length); else - result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, - rq->bio); - if (result) - goto err_img_request; - - result = rbd_img_request_submit(img_request); + result = rbd_img_fill_from_bio(img_request, offset, length, + rq->bio); if (result) goto err_img_request; + rbd_img_request_submit(img_request); if (must_be_locked) up_read(&rbd_dev->lock_rwsem); return; @@ -4369,7 +3947,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE); q->limits.max_sectors = queue_max_hw_sectors(q); blk_queue_max_segments(q, USHRT_MAX); - blk_queue_max_segment_size(q, segment_size); + blk_queue_max_segment_size(q, UINT_MAX); blk_queue_io_min(q, segment_size); blk_queue_io_opt(q, segment_size); @@ -5057,9 +4635,6 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) } __attribute__ ((packed)) striping_info_buf = { 0 }; size_t size = sizeof (striping_info_buf); void *p; - u64 obj_size; - u64 stripe_unit; - u64 stripe_count; int ret; ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, @@ -5071,31 +4646,9 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) if (ret < size) return -ERANGE; - /* - * We don't actually support the "fancy striping" feature - * (STRIPINGV2) yet, but if the striping sizes are the - * defaults the behavior is the same as before. So find - * out, and only fail if the image has non-default values. - */ - ret = -EINVAL; - obj_size = rbd_obj_bytes(&rbd_dev->header); p = &striping_info_buf; - stripe_unit = ceph_decode_64(&p); - if (stripe_unit != obj_size) { - rbd_warn(rbd_dev, "unsupported stripe unit " - "(got %llu want %llu)", - stripe_unit, obj_size); - return -EINVAL; - } - stripe_count = ceph_decode_64(&p); - if (stripe_count != 1) { - rbd_warn(rbd_dev, "unsupported stripe count " - "(got %llu want 1)", stripe_count); - return -EINVAL; - } - rbd_dev->header.stripe_unit = stripe_unit; - rbd_dev->header.stripe_count = stripe_count; - + rbd_dev->header.stripe_unit = ceph_decode_64(&p); + rbd_dev->header.stripe_count = ceph_decode_64(&p); return 0; } @@ -5653,39 +5206,6 @@ out_err: return ret; } -/* - * Return pool id (>= 0) or a negative error code. - */ -static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name) -{ - struct ceph_options *opts = rbdc->client->options; - u64 newest_epoch; - int tries = 0; - int ret; - -again: - ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name); - if (ret == -ENOENT && tries++ < 1) { - ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap", - &newest_epoch); - if (ret < 0) - return ret; - - if (rbdc->client->osdc.osdmap->epoch < newest_epoch) { - ceph_osdc_maybe_request_map(&rbdc->client->osdc); - (void) ceph_monc_wait_osdmap(&rbdc->client->monc, - newest_epoch, - opts->mount_timeout); - goto again; - } else { - /* the osdmap we have is new enough */ - return -ENOENT; - } - } - - return ret; -} - static void rbd_dev_image_unlock(struct rbd_device *rbd_dev) { down_write(&rbd_dev->lock_rwsem); @@ -6114,7 +5634,7 @@ static ssize_t do_rbd_add(struct bus_type *bus, } /* pick the pool */ - rc = rbd_add_get_pool_id(rbdc, spec->pool_name); + rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name); if (rc < 0) { if (rc == -ENOENT) pr_info("pool %s does not exist\n", spec->pool_name); @@ -6366,16 +5886,8 @@ static int rbd_slab_init(void) if (!rbd_obj_request_cache) goto out_err; - rbd_assert(!rbd_bio_clone); - rbd_bio_clone = bioset_create(BIO_POOL_SIZE, 0, 0); - if (!rbd_bio_clone) - goto out_err_clone; - return 0; -out_err_clone: - kmem_cache_destroy(rbd_obj_request_cache); - rbd_obj_request_cache = NULL; out_err: kmem_cache_destroy(rbd_img_request_cache); rbd_img_request_cache = NULL; @@ -6391,10 +5903,6 @@ static void rbd_slab_exit(void) rbd_assert(rbd_img_request_cache); kmem_cache_destroy(rbd_img_request_cache); rbd_img_request_cache = NULL; - - rbd_assert(rbd_bio_clone); - bioset_free(rbd_bio_clone); - rbd_bio_clone = NULL; } static int __init rbd_init(void) diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile index 174f5709e508..a699e320393f 100644 --- a/fs/ceph/Makefile +++ b/fs/ceph/Makefile @@ -6,7 +6,7 @@ obj-$(CONFIG_CEPH_FS) += ceph.o ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \ - export.o caps.o snap.o xattr.o \ + export.o caps.o snap.o xattr.o quota.o \ mds_client.o mdsmap.o strings.o ceph_frag.o \ debugfs.o diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index b4336b42ce3b..5f7ad3d0df2e 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -15,6 +15,7 @@ #include "mds_client.h" #include "cache.h" #include <linux/ceph/osd_client.h> +#include <linux/ceph/striper.h> /* * Ceph address space ops. @@ -438,7 +439,7 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, { struct inode *inode = file_inode(file); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); - struct ceph_file_info *ci = file->private_data; + struct ceph_file_info *fi = file->private_data; struct ceph_rw_context *rw_ctx; int rc = 0; int max = 0; @@ -452,7 +453,7 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, if (rc == 0) goto out; - rw_ctx = ceph_find_rw_context(ci); + rw_ctx = ceph_find_rw_context(fi); max = fsc->mount_options->rsize >> PAGE_SHIFT; dout("readpages %p file %p ctx %p nr_pages %d max %d\n", inode, file, rw_ctx, nr_pages, max); @@ -800,7 +801,7 @@ static int ceph_writepages_start(struct address_space *mapping, struct ceph_osd_request *req = NULL; struct ceph_writeback_ctl ceph_wbc; bool should_loop, range_whole = false; - bool stop, done = false; + bool done = false; dout("writepages_start %p (mode=%s)\n", inode, wbc->sync_mode == WB_SYNC_NONE ? "NONE" : @@ -856,7 +857,7 @@ retry: * in that range can be associated with newer snapc. * They are not writeable until we write all dirty pages * associated with 'snapc' get written */ - if (index > 0 || wbc->sync_mode != WB_SYNC_NONE) + if (index > 0) should_loop = true; dout(" non-head snapc, range whole\n"); } @@ -864,8 +865,7 @@ retry: ceph_put_snap_context(last_snapc); last_snapc = snapc; - stop = false; - while (!stop && index <= end) { + while (!done && index <= end) { int num_ops = 0, op_idx; unsigned i, pvec_pages, max_pages, locked_pages = 0; struct page **pages = NULL, **data_pages; @@ -898,16 +898,30 @@ get_more_pages: unlock_page(page); continue; } - if (strip_unit_end && (page->index > strip_unit_end)) { - dout("end of strip unit %p\n", page); + /* only if matching snap context */ + pgsnapc = page_snap_context(page); + if (pgsnapc != snapc) { + dout("page snapc %p %lld != oldest %p %lld\n", + pgsnapc, pgsnapc->seq, snapc, snapc->seq); + if (!should_loop && + !ceph_wbc.head_snapc && + wbc->sync_mode != WB_SYNC_NONE) + should_loop = true; unlock_page(page); - break; + continue; } if (page_offset(page) >= ceph_wbc.i_size) { dout("%p page eof %llu\n", page, ceph_wbc.i_size); - /* not done if range_cyclic */ - stop = true; + if (ceph_wbc.size_stable || + page_offset(page) >= i_size_read(inode)) + mapping->a_ops->invalidatepage(page, + 0, PAGE_SIZE); + unlock_page(page); + continue; + } + if (strip_unit_end && (page->index > strip_unit_end)) { + dout("end of strip unit %p\n", page); unlock_page(page); break; } @@ -921,15 +935,6 @@ get_more_pages: wait_on_page_writeback(page); } - /* only if matching snap context */ - pgsnapc = page_snap_context(page); - if (pgsnapc != snapc) { - dout("page snapc %p %lld != oldest %p %lld\n", - pgsnapc, pgsnapc->seq, snapc, snapc->seq); - unlock_page(page); - continue; - } - if (!clear_page_dirty_for_io(page)) { dout("%p !clear_page_dirty_for_io\n", page); unlock_page(page); @@ -945,19 +950,15 @@ get_more_pages: if (locked_pages == 0) { u64 objnum; u64 objoff; + u32 xlen; /* prepare async write request */ offset = (u64)page_offset(page); - len = wsize; - - rc = ceph_calc_file_object_mapping(&ci->i_layout, - offset, len, - &objnum, &objoff, - &len); - if (rc < 0) { - unlock_page(page); - break; - } + ceph_calc_file_object_mapping(&ci->i_layout, + offset, wsize, + &objnum, &objoff, + &xlen); + len = xlen; num_ops = 1; strip_unit_end = page->index + @@ -1146,7 +1147,7 @@ new_request: * we tagged for writeback prior to entering this loop. */ if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE) - done = stop = true; + done = true; release_pvec_pages: dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr, diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c index 33a211b364ed..bb524c880b1e 100644 --- a/fs/ceph/cache.c +++ b/fs/ceph/cache.c @@ -51,7 +51,7 @@ static const struct fscache_cookie_def ceph_fscache_fsid_object_def = { .type = FSCACHE_COOKIE_TYPE_INDEX, }; -int ceph_fscache_register(void) +int __init ceph_fscache_register(void) { return fscache_register_netfs(&ceph_cache_netfs); } @@ -135,7 +135,7 @@ static enum fscache_checkaux ceph_fscache_inode_check_aux( if (memcmp(data, &aux, sizeof(aux)) != 0) return FSCACHE_CHECKAUX_OBSOLETE; - dout("ceph inode 0x%p cached okay", ci); + dout("ceph inode 0x%p cached okay\n", ci); return FSCACHE_CHECKAUX_OKAY; } diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 0e5bd3e3344e..23dbfae16156 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -184,36 +184,54 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc, mdsc->caps_avail_count); spin_unlock(&mdsc->caps_list_lock); - for (i = have; i < need; i++) { -retry: + for (i = have; i < need; ) { cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); - if (!cap) { - if (!trimmed) { - for (j = 0; j < mdsc->max_sessions; j++) { - s = __ceph_lookup_mds_session(mdsc, j); - if (!s) - continue; - mutex_unlock(&mdsc->mutex); + if (cap) { + list_add(&cap->caps_item, &newcaps); + alloc++; + i++; + continue; + } - mutex_lock(&s->s_mutex); - max_caps = s->s_nr_caps - (need - i); - ceph_trim_caps(mdsc, s, max_caps); - mutex_unlock(&s->s_mutex); + if (!trimmed) { + for (j = 0; j < mdsc->max_sessions; j++) { + s = __ceph_lookup_mds_session(mdsc, j); + if (!s) + continue; + mutex_unlock(&mdsc->mutex); - ceph_put_mds_session(s); - mutex_lock(&mdsc->mutex); - } - trimmed = true; - goto retry; - } else { - pr_warn("reserve caps ctx=%p ENOMEM " - "need=%d got=%d\n", - ctx, need, have + alloc); - goto out_nomem; + mutex_lock(&s->s_mutex); + max_caps = s->s_nr_caps - (need - i); + ceph_trim_caps(mdsc, s, max_caps); + mutex_unlock(&s->s_mutex); + + ceph_put_mds_session(s); + mutex_lock(&mdsc->mutex); } + trimmed = true; + + spin_lock(&mdsc->caps_list_lock); + if (mdsc->caps_avail_count) { + int more_have; + if (mdsc->caps_avail_count >= need - i) + more_have = need - i; + else + more_have = mdsc->caps_avail_count; + + i += more_have; + have += more_have; + mdsc->caps_avail_count -= more_have; + mdsc->caps_reserve_count += more_have; + + } + spin_unlock(&mdsc->caps_list_lock); + + continue; } - list_add(&cap->caps_item, &newcaps); - alloc++; + + pr_warn("reserve caps ctx=%p ENOMEM need=%d got=%d\n", + ctx, need, have + alloc); + goto out_nomem; } BUG_ON(have + alloc != need); @@ -234,16 +252,28 @@ retry: return 0; out_nomem: + + spin_lock(&mdsc->caps_list_lock); + mdsc->caps_avail_count += have; + mdsc->caps_reserve_count -= have; + while (!list_empty(&newcaps)) { cap = list_first_entry(&newcaps, struct ceph_cap, caps_item); list_del(&cap->caps_item); - kmem_cache_free(ceph_cap_cachep, cap); + + /* Keep some preallocated caps around (ceph_min_count), to + * avoid lots of free/alloc churn. */ + if (mdsc->caps_avail_count >= + mdsc->caps_reserve_count + mdsc->caps_min_count) { + kmem_cache_free(ceph_cap_cachep, cap); + } else { + mdsc->caps_avail_count++; + mdsc->caps_total_count++; + list_add(&cap->caps_item, &mdsc->caps_list); + } } - spin_lock(&mdsc->caps_list_lock); - mdsc->caps_avail_count += have; - mdsc->caps_reserve_count -= have; BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + mdsc->caps_reserve_count + mdsc->caps_avail_count); @@ -254,12 +284,26 @@ out_nomem: int ceph_unreserve_caps(struct ceph_mds_client *mdsc, struct ceph_cap_reservation *ctx) { + int i; + struct ceph_cap *cap; + dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count); if (ctx->count) { spin_lock(&mdsc->caps_list_lock); BUG_ON(mdsc->caps_reserve_count < ctx->count); mdsc->caps_reserve_count -= ctx->count; - mdsc->caps_avail_count += ctx->count; + if (mdsc->caps_avail_count >= + mdsc->caps_reserve_count + mdsc->caps_min_count) { + mdsc->caps_total_count -= ctx->count; + for (i = 0; i < ctx->count; i++) { + cap = list_first_entry(&mdsc->caps_list, + struct ceph_cap, caps_item); + list_del(&cap->caps_item); + kmem_cache_free(ceph_cap_cachep, cap); + } + } else { + mdsc->caps_avail_count += ctx->count; + } ctx->count = 0; dout("unreserve caps %d = %d used + %d resv + %d avail\n", mdsc->caps_total_count, mdsc->caps_use_count, @@ -285,7 +329,23 @@ struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc, mdsc->caps_use_count++; mdsc->caps_total_count++; spin_unlock(&mdsc->caps_list_lock); + } else { + spin_lock(&mdsc->caps_list_lock); + if (mdsc->caps_avail_count) { + BUG_ON(list_empty(&mdsc->caps_list)); + + mdsc->caps_avail_count--; + mdsc->caps_use_count++; + cap = list_first_entry(&mdsc->caps_list, + struct ceph_cap, caps_item); + list_del(&cap->caps_item); + + BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + + mdsc->caps_reserve_count + mdsc->caps_avail_count); + } + spin_unlock(&mdsc->caps_list_lock); } + return cap; } @@ -341,6 +401,8 @@ void ceph_reservation_status(struct ceph_fs_client *fsc, { struct ceph_mds_client *mdsc = fsc->mdsc; + spin_lock(&mdsc->caps_list_lock); + if (total) *total = mdsc->caps_total_count; if (avail) @@ -351,6 +413,8 @@ void ceph_reservation_status(struct ceph_fs_client *fsc, *reserved = mdsc->caps_reserve_count; if (min) *min = mdsc->caps_min_count; + + spin_unlock(&mdsc->caps_list_lock); } /* @@ -639,9 +703,11 @@ void ceph_add_cap(struct inode *inode, } spin_lock(&realm->inodes_with_caps_lock); - ci->i_snap_realm = realm; list_add(&ci->i_snap_realm_item, &realm->inodes_with_caps); + ci->i_snap_realm = realm; + if (realm->ino == ci->i_vino.ino) + realm->inode = inode; spin_unlock(&realm->inodes_with_caps_lock); if (oldrealm) diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c index 644def813754..abdf98deeec4 100644 --- a/fs/ceph/debugfs.c +++ b/fs/ceph/debugfs.c @@ -260,7 +260,7 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc) goto out; fsc->debugfs_mdsmap = debugfs_create_file("mdsmap", - 0600, + 0400, fsc->client->debugfs_dir, fsc, &mdsmap_show_fops); @@ -268,7 +268,7 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc) goto out; fsc->debugfs_mds_sessions = debugfs_create_file("mds_sessions", - 0600, + 0400, fsc->client->debugfs_dir, fsc, &mds_sessions_show_fops); @@ -276,7 +276,7 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc) goto out; fsc->debugfs_mdsc = debugfs_create_file("mdsc", - 0600, + 0400, fsc->client->debugfs_dir, fsc, &mdsc_show_fops); @@ -292,7 +292,7 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc) goto out; fsc->debugfs_dentry_lru = debugfs_create_file("dentry_lru", - 0600, + 0400, fsc->client->debugfs_dir, fsc, &dentry_lru_show_fops); diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 2bdd561c4c68..1a78dd6f8bf2 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -101,18 +101,18 @@ static int fpos_cmp(loff_t l, loff_t r) * regardless of what dir changes take place on the * server. */ -static int note_last_dentry(struct ceph_file_info *fi, const char *name, +static int note_last_dentry(struct ceph_dir_file_info *dfi, const char *name, int len, unsigned next_offset) { char *buf = kmalloc(len+1, GFP_KERNEL); if (!buf) return -ENOMEM; - kfree(fi->last_name); - fi->last_name = buf; - memcpy(fi->last_name, name, len); - fi->last_name[len] = 0; - fi->next_offset = next_offset; - dout("note_last_dentry '%s'\n", fi->last_name); + kfree(dfi->last_name); + dfi->last_name = buf; + memcpy(dfi->last_name, name, len); + dfi->last_name[len] = 0; + dfi->next_offset = next_offset; + dout("note_last_dentry '%s'\n", dfi->last_name); return 0; } @@ -174,7 +174,7 @@ __dcache_find_get_entry(struct dentry *parent, u64 idx, static int __dcache_readdir(struct file *file, struct dir_context *ctx, int shared_gen) { - struct ceph_file_info *fi = file->private_data; + struct ceph_dir_file_info *dfi = file->private_data; struct dentry *parent = file->f_path.dentry; struct inode *dir = d_inode(parent); struct dentry *dentry, *last = NULL; @@ -221,7 +221,7 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx, bool emit_dentry = false; dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl); if (!dentry) { - fi->flags |= CEPH_F_ATEND; + dfi->file_info.flags |= CEPH_F_ATEND; err = 0; break; } @@ -272,33 +272,33 @@ out: if (last) { int ret; di = ceph_dentry(last); - ret = note_last_dentry(fi, last->d_name.name, last->d_name.len, + ret = note_last_dentry(dfi, last->d_name.name, last->d_name.len, fpos_off(di->offset) + 1); if (ret < 0) err = ret; dput(last); /* last_name no longer match cache index */ - if (fi->readdir_cache_idx >= 0) { - fi->readdir_cache_idx = -1; - fi->dir_release_count = 0; + if (dfi->readdir_cache_idx >= 0) { + dfi->readdir_cache_idx = -1; + dfi->dir_release_count = 0; } } return err; } -static bool need_send_readdir(struct ceph_file_info *fi, loff_t pos) +static bool need_send_readdir(struct ceph_dir_file_info *dfi, loff_t pos) { - if (!fi->last_readdir) + if (!dfi->last_readdir) return true; if (is_hash_order(pos)) - return !ceph_frag_contains_value(fi->frag, fpos_hash(pos)); + return !ceph_frag_contains_value(dfi->frag, fpos_hash(pos)); else - return fi->frag != fpos_frag(pos); + return dfi->frag != fpos_frag(pos); } static int ceph_readdir(struct file *file, struct dir_context *ctx) { - struct ceph_file_info *fi = file->private_data; + struct ceph_dir_file_info *dfi = file->private_data; struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); @@ -309,7 +309,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) struct ceph_mds_reply_info_parsed *rinfo; dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos); - if (fi->flags & CEPH_F_ATEND) + if (dfi->file_info.flags & CEPH_F_ATEND) return 0; /* always start with . and .. */ @@ -350,15 +350,15 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) /* proceed with a normal readdir */ more: /* do we have the correct frag content buffered? */ - if (need_send_readdir(fi, ctx->pos)) { + if (need_send_readdir(dfi, ctx->pos)) { struct ceph_mds_request *req; int op = ceph_snap(inode) == CEPH_SNAPDIR ? CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR; /* discard old result, if any */ - if (fi->last_readdir) { - ceph_mdsc_put_request(fi->last_readdir); - fi->last_readdir = NULL; + if (dfi->last_readdir) { + ceph_mdsc_put_request(dfi->last_readdir); + dfi->last_readdir = NULL; } if (is_hash_order(ctx->pos)) { @@ -372,7 +372,7 @@ more: } dout("readdir fetching %llx.%llx frag %x offset '%s'\n", - ceph_vinop(inode), frag, fi->last_name); + ceph_vinop(inode), frag, dfi->last_name); req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); if (IS_ERR(req)) return PTR_ERR(req); @@ -388,8 +388,8 @@ more: __set_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); req->r_inode_drop = CEPH_CAP_FILE_EXCL; } - if (fi->last_name) { - req->r_path2 = kstrdup(fi->last_name, GFP_KERNEL); + if (dfi->last_name) { + req->r_path2 = kstrdup(dfi->last_name, GFP_KERNEL); if (!req->r_path2) { ceph_mdsc_put_request(req); return -ENOMEM; @@ -399,10 +399,10 @@ more: cpu_to_le32(fpos_hash(ctx->pos)); } - req->r_dir_release_cnt = fi->dir_release_count; - req->r_dir_ordered_cnt = fi->dir_ordered_count; - req->r_readdir_cache_idx = fi->readdir_cache_idx; - req->r_readdir_offset = fi->next_offset; + req->r_dir_release_cnt = dfi->dir_release_count; + req->r_dir_ordered_cnt = dfi->dir_ordered_count; + req->r_readdir_cache_idx = dfi->readdir_cache_idx; + req->r_readdir_offset = dfi->next_offset; req->r_args.readdir.frag = cpu_to_le32(frag); req->r_args.readdir.flags = cpu_to_le16(CEPH_READDIR_REPLY_BITFLAGS); @@ -426,35 +426,35 @@ more: if (le32_to_cpu(rinfo->dir_dir->frag) != frag) { frag = le32_to_cpu(rinfo->dir_dir->frag); if (!rinfo->hash_order) { - fi->next_offset = req->r_readdir_offset; + dfi->next_offset = req->r_readdir_offset; /* adjust ctx->pos to beginning of frag */ ctx->pos = ceph_make_fpos(frag, - fi->next_offset, + dfi->next_offset, false); } } - fi->frag = frag; - fi->last_readdir = req; + dfi->frag = frag; + dfi->last_readdir = req; if (test_bit(CEPH_MDS_R_DID_PREPOPULATE, &req->r_req_flags)) { - fi->readdir_cache_idx = req->r_readdir_cache_idx; - if (fi->readdir_cache_idx < 0) { + dfi->readdir_cache_idx = req->r_readdir_cache_idx; + if (dfi->readdir_cache_idx < 0) { /* preclude from marking dir ordered */ - fi->dir_ordered_count = 0; + dfi->dir_ordered_count = 0; } else if (ceph_frag_is_leftmost(frag) && - fi->next_offset == 2) { + dfi->next_offset == 2) { /* note dir version at start of readdir so * we can tell if any dentries get dropped */ - fi->dir_release_count = req->r_dir_release_cnt; - fi->dir_ordered_count = req->r_dir_ordered_cnt; + dfi->dir_release_count = req->r_dir_release_cnt; + dfi->dir_ordered_count = req->r_dir_ordered_cnt; } } else { - dout("readdir !did_prepopulate"); + dout("readdir !did_prepopulate\n"); /* disable readdir cache */ - fi->readdir_cache_idx = -1; + dfi->readdir_cache_idx = -1; /* preclude from marking dir complete */ - fi->dir_release_count = 0; + dfi->dir_release_count = 0; } /* note next offset and last dentry name */ @@ -463,19 +463,19 @@ more: rinfo->dir_entries + (rinfo->dir_nr-1); unsigned next_offset = req->r_reply_info.dir_end ? 2 : (fpos_off(rde->offset) + 1); - err = note_last_dentry(fi, rde->name, rde->name_len, + err = note_last_dentry(dfi, rde->name, rde->name_len, next_offset); if (err) return err; } else if (req->r_reply_info.dir_end) { - fi->next_offset = 2; + dfi->next_offset = 2; /* keep last name */ } } - rinfo = &fi->last_readdir->r_reply_info; + rinfo = &dfi->last_readdir->r_reply_info; dout("readdir frag %x num %d pos %llx chunk first %llx\n", - fi->frag, rinfo->dir_nr, ctx->pos, + dfi->frag, rinfo->dir_nr, ctx->pos, rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL); i = 0; @@ -519,52 +519,55 @@ more: ctx->pos++; } - ceph_mdsc_put_request(fi->last_readdir); - fi->last_readdir = NULL; + ceph_mdsc_put_request(dfi->last_readdir); + dfi->last_readdir = NULL; - if (fi->next_offset > 2) { - frag = fi->frag; + if (dfi->next_offset > 2) { + frag = dfi->frag; goto more; } /* more frags? */ - if (!ceph_frag_is_rightmost(fi->frag)) { - frag = ceph_frag_next(fi->frag); + if (!ceph_frag_is_rightmost(dfi->frag)) { + frag = ceph_frag_next(dfi->frag); if (is_hash_order(ctx->pos)) { loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag), - fi->next_offset, true); + dfi->next_offset, true); if (new_pos > ctx->pos) ctx->pos = new_pos; /* keep last_name */ } else { - ctx->pos = ceph_make_fpos(frag, fi->next_offset, false); - kfree(fi->last_name); - fi->last_name = NULL; + ctx->pos = ceph_make_fpos(frag, dfi->next_offset, + false); + kfree(dfi->last_name); + dfi->last_name = NULL; } dout("readdir next frag is %x\n", frag); goto more; } - fi->flags |= CEPH_F_ATEND; + dfi->file_info.flags |= CEPH_F_ATEND; /* * if dir_release_count still matches the dir, no dentries * were released during the whole readdir, and we should have * the complete dir contents in our cache. */ - if (atomic64_read(&ci->i_release_count) == fi->dir_release_count) { + if (atomic64_read(&ci->i_release_count) == + dfi->dir_release_count) { spin_lock(&ci->i_ceph_lock); - if (fi->dir_ordered_count == atomic64_read(&ci->i_ordered_count)) { + if (dfi->dir_ordered_count == + atomic64_read(&ci->i_ordered_count)) { dout(" marking %p complete and ordered\n", inode); /* use i_size to track number of entries in * readdir cache */ - BUG_ON(fi->readdir_cache_idx < 0); - i_size_write(inode, fi->readdir_cache_idx * + BUG_ON(dfi->readdir_cache_idx < 0); + i_size_write(inode, dfi->readdir_cache_idx * sizeof(struct dentry*)); } else { dout(" marking %p complete\n", inode); } - __ceph_dir_set_complete(ci, fi->dir_release_count, - fi->dir_ordered_count); + __ceph_dir_set_complete(ci, dfi->dir_release_count, + dfi->dir_ordered_count); spin_unlock(&ci->i_ceph_lock); } @@ -572,25 +575,25 @@ more: return 0; } -static void reset_readdir(struct ceph_file_info *fi) +static void reset_readdir(struct ceph_dir_file_info *dfi) { - if (fi->last_readdir) { - ceph_mdsc_put_request(fi->last_readdir); - fi->last_readdir = NULL; + if (dfi->last_readdir) { + ceph_mdsc_put_request(dfi->last_readdir); + dfi->last_readdir = NULL; } - kfree(fi->last_name); - fi->last_name = NULL; - fi->dir_release_count = 0; - fi->readdir_cache_idx = -1; - fi->next_offset = 2; /* compensate for . and .. */ - fi->flags &= ~CEPH_F_ATEND; + kfree(dfi->last_name); + dfi->last_name = NULL; + dfi->dir_release_count = 0; + dfi->readdir_cache_idx = -1; + dfi->next_offset = 2; /* compensate for . and .. */ + dfi->file_info.flags &= ~CEPH_F_ATEND; } /* * discard buffered readdir content on seekdir(0), or seek to new frag, * or seek prior to current chunk */ -static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos) +static bool need_reset_readdir(struct ceph_dir_file_info *dfi, loff_t new_pos) { struct ceph_mds_reply_info_parsed *rinfo; loff_t chunk_offset; @@ -599,10 +602,10 @@ static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos) if (is_hash_order(new_pos)) { /* no need to reset last_name for a forward seek when * dentries are sotred in hash order */ - } else if (fi->frag != fpos_frag(new_pos)) { + } else if (dfi->frag != fpos_frag(new_pos)) { return true; } - rinfo = fi->last_readdir ? &fi->last_readdir->r_reply_info : NULL; + rinfo = dfi->last_readdir ? &dfi->last_readdir->r_reply_info : NULL; if (!rinfo || !rinfo->dir_nr) return true; chunk_offset = rinfo->dir_entries[0].offset; @@ -612,7 +615,7 @@ static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos) static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) { - struct ceph_file_info *fi = file->private_data; + struct ceph_dir_file_info *dfi = file->private_data; struct inode *inode = file->f_mapping->host; loff_t retval; @@ -630,20 +633,20 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) } if (offset >= 0) { - if (need_reset_readdir(fi, offset)) { + if (need_reset_readdir(dfi, offset)) { dout("dir_llseek dropping %p content\n", file); - reset_readdir(fi); + reset_readdir(dfi); } else if (is_hash_order(offset) && offset > file->f_pos) { /* for hash offset, we don't know if a forward seek * is within same frag */ - fi->dir_release_count = 0; - fi->readdir_cache_idx = -1; + dfi->dir_release_count = 0; + dfi->readdir_cache_idx = -1; } if (offset != file->f_pos) { file->f_pos = offset; file->f_version = 0; - fi->flags &= ~CEPH_F_ATEND; + dfi->file_info.flags &= ~CEPH_F_ATEND; } retval = offset; } @@ -824,6 +827,9 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, if (ceph_snap(dir) != CEPH_NOSNAP) return -EROFS; + if (ceph_quota_is_max_files_exceeded(dir)) + return -EDQUOT; + err = ceph_pre_init_acls(dir, &mode, &acls); if (err < 0) return err; @@ -877,6 +883,9 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry, if (ceph_snap(dir) != CEPH_NOSNAP) return -EROFS; + if (ceph_quota_is_max_files_exceeded(dir)) + return -EDQUOT; + dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest); req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS); if (IS_ERR(req)) { @@ -926,6 +935,12 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) goto out; } + if (op == CEPH_MDS_OP_MKDIR && + ceph_quota_is_max_files_exceeded(dir)) { + err = -EDQUOT; + goto out; + } + mode |= S_IFDIR; err = ceph_pre_init_acls(dir, &mode, &acls); if (err < 0) @@ -1065,6 +1080,11 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, else return -EROFS; } + /* don't allow cross-quota renames */ + if ((old_dir != new_dir) && + (!ceph_quota_is_same_realm(old_dir, new_dir))) + return -EXDEV; + dout("rename dir %p dentry %p to dir %p dentry %p\n", old_dir, old_dentry, new_dir, new_dentry); req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); @@ -1351,7 +1371,7 @@ static void ceph_d_prune(struct dentry *dentry) static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size, loff_t *ppos) { - struct ceph_file_info *cf = file->private_data; + struct ceph_dir_file_info *dfi = file->private_data; struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); int left; @@ -1360,12 +1380,12 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size, if (!ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), DIRSTAT)) return -EISDIR; - if (!cf->dir_info) { - cf->dir_info = kmalloc(bufsize, GFP_KERNEL); - if (!cf->dir_info) + if (!dfi->dir_info) { + dfi->dir_info = kmalloc(bufsize, GFP_KERNEL); + if (!dfi->dir_info) return -ENOMEM; - cf->dir_info_len = - snprintf(cf->dir_info, bufsize, + dfi->dir_info_len = + snprintf(dfi->dir_info, bufsize, "entries: %20lld\n" " files: %20lld\n" " subdirs: %20lld\n" @@ -1385,10 +1405,10 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size, (long)ci->i_rctime.tv_nsec); } - if (*ppos >= cf->dir_info_len) + if (*ppos >= dfi->dir_info_len) return 0; - size = min_t(unsigned, size, cf->dir_info_len-*ppos); - left = copy_to_user(buf, cf->dir_info + *ppos, size); + size = min_t(unsigned, size, dfi->dir_info_len-*ppos); + left = copy_to_user(buf, dfi->dir_info + *ppos, size); if (left == size) return -EFAULT; *ppos += (size - left); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index b67eec3532a1..f85040d73e3d 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -30,6 +30,8 @@ static __le32 ceph_flags_sys2wire(u32 flags) break; } + flags &= ~O_ACCMODE; + #define ceph_sys2wire(a) if (flags & a) { wire_flags |= CEPH_##a; flags &= ~a; } ceph_sys2wire(O_CREAT); @@ -41,7 +43,7 @@ static __le32 ceph_flags_sys2wire(u32 flags) #undef ceph_sys2wire if (flags) - dout("unused open flags: %x", flags); + dout("unused open flags: %x\n", flags); return cpu_to_le32(wire_flags); } @@ -159,13 +161,50 @@ out: return req; } +static int ceph_init_file_info(struct inode *inode, struct file *file, + int fmode, bool isdir) +{ + struct ceph_file_info *fi; + + dout("%s %p %p 0%o (%s)\n", __func__, inode, file, + inode->i_mode, isdir ? "dir" : "regular"); + BUG_ON(inode->i_fop->release != ceph_release); + + if (isdir) { + struct ceph_dir_file_info *dfi = + kmem_cache_zalloc(ceph_dir_file_cachep, GFP_KERNEL); + if (!dfi) { + ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */ + return -ENOMEM; + } + + file->private_data = dfi; + fi = &dfi->file_info; + dfi->next_offset = 2; + dfi->readdir_cache_idx = -1; + } else { + fi = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL); + if (!fi) { + ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */ + return -ENOMEM; + } + + file->private_data = fi; + } + + fi->fmode = fmode; + spin_lock_init(&fi->rw_contexts_lock); + INIT_LIST_HEAD(&fi->rw_contexts); + + return 0; +} + /* * initialize private struct file data. * if we fail, clean up by dropping fmode reference on the ceph_inode */ static int ceph_init_file(struct inode *inode, struct file *file, int fmode) { - struct ceph_file_info *cf; int ret = 0; switch (inode->i_mode & S_IFMT) { @@ -173,22 +212,10 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode) ceph_fscache_register_inode_cookie(inode); ceph_fscache_file_set_cookie(inode, file); case S_IFDIR: - dout("init_file %p %p 0%o (regular)\n", inode, file, - inode->i_mode); - cf = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL); - if (!cf) { - ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */ - return -ENOMEM; - } - cf->fmode = fmode; - - spin_lock_init(&cf->rw_contexts_lock); - INIT_LIST_HEAD(&cf->rw_contexts); - - cf->next_offset = 2; - cf->readdir_cache_idx = -1; - file->private_data = cf; - BUG_ON(inode->i_fop->release != ceph_release); + ret = ceph_init_file_info(inode, file, fmode, + S_ISDIR(inode->i_mode)); + if (ret) + return ret; break; case S_IFLNK: @@ -278,11 +305,11 @@ int ceph_open(struct inode *inode, struct file *file) struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb); struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_request *req; - struct ceph_file_info *cf = file->private_data; + struct ceph_file_info *fi = file->private_data; int err; int flags, fmode, wanted; - if (cf) { + if (fi) { dout("open file %p is already opened\n", file); return 0; } @@ -375,7 +402,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, struct ceph_mds_request *req; struct dentry *dn; struct ceph_acls_info acls = {}; - int mask; + int mask; int err; dout("atomic_open %p dentry %p '%pd' %s flags %d mode 0%o\n", @@ -386,6 +413,8 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, return -ENAMETOOLONG; if (flags & O_CREAT) { + if (ceph_quota_is_max_files_exceeded(dir)) + return -EDQUOT; err = ceph_pre_init_acls(dir, &mode, &acls); if (err < 0) return err; @@ -460,16 +489,27 @@ out_acl: int ceph_release(struct inode *inode, struct file *file) { struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_file_info *cf = file->private_data; - dout("release inode %p file %p\n", inode, file); - ceph_put_fmode(ci, cf->fmode); - if (cf->last_readdir) - ceph_mdsc_put_request(cf->last_readdir); - kfree(cf->last_name); - kfree(cf->dir_info); - WARN_ON(!list_empty(&cf->rw_contexts)); - kmem_cache_free(ceph_file_cachep, cf); + if (S_ISDIR(inode->i_mode)) { + struct ceph_dir_file_info *dfi = file->private_data; + dout("release inode %p dir file %p\n", inode, file); + WARN_ON(!list_empty(&dfi->file_info.rw_contexts)); + + ceph_put_fmode(ci, dfi->file_info.fmode); + + if (dfi->last_readdir) + ceph_mdsc_put_request(dfi->last_readdir); + kfree(dfi->last_name); + kfree(dfi->dir_info); + kmem_cache_free(ceph_dir_file_cachep, dfi); + } else { + struct ceph_file_info *fi = file->private_data; + dout("release inode %p regular file %p\n", inode, file); + WARN_ON(!list_empty(&fi->rw_contexts)); + + ceph_put_fmode(ci, fi->fmode); + kmem_cache_free(ceph_file_cachep, fi); + } /* wake up anyone waiting for caps on this inode */ wake_up_all(&ci->i_cap_wq); @@ -1338,6 +1378,11 @@ retry_snap: pos = iocb->ki_pos; count = iov_iter_count(from); + if (ceph_quota_is_max_bytes_exceeded(inode, pos + count)) { + err = -EDQUOT; + goto out; + } + err = file_remove_privs(file); if (err) goto out; @@ -1419,6 +1464,7 @@ retry_snap: if (written >= 0) { int dirty; + spin_lock(&ci->i_ceph_lock); ci->i_inline_version = CEPH_INLINE_NONE; dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, @@ -1426,6 +1472,8 @@ retry_snap: spin_unlock(&ci->i_ceph_lock); if (dirty) __mark_inode_dirty(inode, dirty); + if (ceph_quota_is_max_bytes_approaching(inode, iocb->ki_pos)) + ceph_check_caps(ci, CHECK_CAPS_NODELAY, NULL); } dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n", @@ -1668,6 +1716,12 @@ static long ceph_fallocate(struct file *file, int mode, goto unlock; } + if (!(mode & (FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE)) && + ceph_quota_is_max_bytes_exceeded(inode, offset + length)) { + ret = -EDQUOT; + goto unlock; + } + if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !(mode & FALLOC_FL_PUNCH_HOLE)) { ret = -ENOSPC; @@ -1716,6 +1770,9 @@ static long ceph_fallocate(struct file *file, int mode, spin_unlock(&ci->i_ceph_lock); if (dirty) __mark_inode_dirty(inode, dirty); + if ((endoff > size) && + ceph_quota_is_max_bytes_approaching(inode, endoff)) + ceph_check_caps(ci, CHECK_CAPS_NODELAY, NULL); } ceph_put_cap_refs(ci, got); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index c6ec5aa46100..8bf60250309e 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -441,6 +441,9 @@ struct inode *ceph_alloc_inode(struct super_block *sb) atomic64_set(&ci->i_complete_seq[1], 0); ci->i_symlink = NULL; + ci->i_max_bytes = 0; + ci->i_max_files = 0; + memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout)); RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL); @@ -536,6 +539,9 @@ void ceph_destroy_inode(struct inode *inode) ceph_queue_caps_release(inode); + if (__ceph_has_any_quota(ci)) + ceph_adjust_quota_realms_count(inode, false); + /* * we may still have a snap_realm reference if there are stray * caps in i_snap_caps. @@ -548,6 +554,9 @@ void ceph_destroy_inode(struct inode *inode) dout(" dropping residual ref to snap realm %p\n", realm); spin_lock(&realm->inodes_with_caps_lock); list_del_init(&ci->i_snap_realm_item); + ci->i_snap_realm = NULL; + if (realm->ino == ci->i_vino.ino) + realm->inode = NULL; spin_unlock(&realm->inodes_with_caps_lock); ceph_put_snap_realm(mdsc, realm); } @@ -790,6 +799,8 @@ static int fill_inode(struct inode *inode, struct page *locked_page, inode->i_rdev = le32_to_cpu(info->rdev); inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1; + __ceph_update_quota(ci, iinfo->max_bytes, iinfo->max_files); + if ((new_version || (new_issued & CEPH_CAP_AUTH_SHARED)) && (issued & CEPH_CAP_AUTH_EXCL) == 0) { inode->i_mode = le32_to_cpu(info->mode); @@ -1867,20 +1878,9 @@ retry: * possibly truncate them.. so write AND block! */ if (ci->i_wrbuffer_ref_head < ci->i_wrbuffer_ref) { - struct ceph_cap_snap *capsnap; - to = ci->i_truncate_size; - list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { - // MDS should have revoked Frw caps - WARN_ON_ONCE(capsnap->writing); - if (capsnap->dirty_pages && capsnap->size > to) - to = capsnap->size; - } spin_unlock(&ci->i_ceph_lock); dout("__do_pending_vmtruncate %p flushing snaps first\n", inode); - - truncate_pagecache(inode, to); - filemap_write_and_wait_range(&inode->i_data, 0, inode->i_sb->s_maxbytes); goto retry; @@ -2152,6 +2152,10 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) if (err != 0) return err; + if ((attr->ia_valid & ATTR_SIZE) && + ceph_quota_is_max_bytes_exceeded(inode, attr->ia_size)) + return -EDQUOT; + err = __ceph_setattr(inode, attr); if (err >= 0 && (attr->ia_valid & ATTR_MODE)) diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index 851aa69ec8f0..c90f03beb15d 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -5,7 +5,7 @@ #include "super.h" #include "mds_client.h" #include "ioctl.h" - +#include <linux/ceph/striper.h> /* * ioctls @@ -185,7 +185,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) &ceph_sb_to_client(inode->i_sb)->client->osdc; struct ceph_object_locator oloc; CEPH_DEFINE_OID_ONSTACK(oid); - u64 len = 1, olen; + u32 xlen; u64 tmp; struct ceph_pg pgid; int r; @@ -195,13 +195,8 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) return -EFAULT; down_read(&osdc->lock); - r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len, - &dl.object_no, &dl.object_offset, - &olen); - if (r < 0) { - up_read(&osdc->lock); - return -EIO; - } + ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, 1, + &dl.object_no, &dl.object_offset, &xlen); dl.file_offset -= dl.object_offset; dl.object_size = ci->i_layout.object_size; dl.block_size = ci->i_layout.stripe_unit; diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c index 9e66f69ee8a5..9dae2ec7e1fa 100644 --- a/fs/ceph/locks.c +++ b/fs/ceph/locks.c @@ -95,7 +95,7 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct inode *inode, owner = secure_addr(fl->fl_owner); dout("ceph_lock_message: rule: %d, op: %d, owner: %llx, pid: %llu, " - "start: %llu, length: %llu, wait: %d, type: %d", (int)lock_type, + "start: %llu, length: %llu, wait: %d, type: %d\n", (int)lock_type, (int)operation, owner, (u64)fl->fl_pid, fl->fl_start, length, wait, fl->fl_type); @@ -132,7 +132,7 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct inode *inode, } ceph_mdsc_put_request(req); dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, " - "length: %llu, wait: %d, type: %d, err code %d", (int)lock_type, + "length: %llu, wait: %d, type: %d, err code %d\n", (int)lock_type, (int)operation, (u64)fl->fl_pid, fl->fl_start, length, wait, fl->fl_type, err); return err; @@ -226,7 +226,7 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl) if (__mandatory_lock(file->f_mapping->host) && fl->fl_type != F_UNLCK) return -ENOLCK; - dout("ceph_lock, fl_owner: %p", fl->fl_owner); + dout("ceph_lock, fl_owner: %p\n", fl->fl_owner); /* set wait bit as appropriate, then make command as Ceph expects it*/ if (IS_GETLK(cmd)) @@ -264,7 +264,7 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl) err = ceph_lock_message(CEPH_LOCK_FCNTL, op, inode, lock_cmd, wait, fl); if (!err) { if (op == CEPH_MDS_OP_SETFILELOCK) { - dout("mds locked, locking locally"); + dout("mds locked, locking locally\n"); err = posix_lock_file(file, fl, NULL); if (err) { /* undo! This should only happen if @@ -272,7 +272,7 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl) * deadlock. */ ceph_lock_message(CEPH_LOCK_FCNTL, op, inode, CEPH_LOCK_UNLOCK, 0, fl); - dout("got %d on posix_lock_file, undid lock", + dout("got %d on posix_lock_file, undid lock\n", err); } } @@ -294,7 +294,7 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl) if (fl->fl_type & LOCK_MAND) return -EOPNOTSUPP; - dout("ceph_flock, fl_file: %p", fl->fl_file); + dout("ceph_flock, fl_file: %p\n", fl->fl_file); spin_lock(&ci->i_ceph_lock); if (ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) { @@ -329,7 +329,7 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl) ceph_lock_message(CEPH_LOCK_FLOCK, CEPH_MDS_OP_SETFILELOCK, inode, CEPH_LOCK_UNLOCK, 0, fl); - dout("got %d on locks_lock_file_wait, undid lock", err); + dout("got %d on locks_lock_file_wait, undid lock\n", err); } } return err; @@ -356,7 +356,7 @@ void ceph_count_locks(struct inode *inode, int *fcntl_count, int *flock_count) ++(*flock_count); spin_unlock(&ctx->flc_lock); } - dout("counted %d flock locks and %d fcntl locks", + dout("counted %d flock locks and %d fcntl locks\n", *flock_count, *fcntl_count); } @@ -384,7 +384,7 @@ static int lock_to_ceph_filelock(struct file_lock *lock, cephlock->type = CEPH_LOCK_UNLOCK; break; default: - dout("Have unknown lock type %d", lock->fl_type); + dout("Have unknown lock type %d\n", lock->fl_type); err = -EINVAL; } @@ -407,7 +407,7 @@ int ceph_encode_locks_to_buffer(struct inode *inode, int seen_flock = 0; int l = 0; - dout("encoding %d flock and %d fcntl locks", num_flock_locks, + dout("encoding %d flock and %d fcntl locks\n", num_flock_locks, num_fcntl_locks); if (!ctx) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 2e8f90f96540..5ece2e6ad154 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -100,6 +100,26 @@ static int parse_reply_info_in(void **p, void *end, } else info->inline_version = CEPH_INLINE_NONE; + if (features & CEPH_FEATURE_MDS_QUOTA) { + u8 struct_v, struct_compat; + u32 struct_len; + + /* + * both struct_v and struct_compat are expected to be >= 1 + */ + ceph_decode_8_safe(p, end, struct_v, bad); + ceph_decode_8_safe(p, end, struct_compat, bad); + if (!struct_v || !struct_compat) + goto bad; + ceph_decode_32_safe(p, end, struct_len, bad); + ceph_decode_need(p, end, struct_len, bad); + ceph_decode_64_safe(p, end, info->max_bytes, bad); + ceph_decode_64_safe(p, end, info->max_files, bad); + } else { + info->max_bytes = 0; + info->max_files = 0; + } + info->pool_ns_len = 0; info->pool_ns_data = NULL; if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { @@ -384,7 +404,7 @@ static struct ceph_mds_session *get_session(struct ceph_mds_session *s) refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref)); return s; } else { - dout("mdsc get_session %p 0 -- FAIL", s); + dout("mdsc get_session %p 0 -- FAIL\n", s); return NULL; } } @@ -419,9 +439,10 @@ struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, static bool __have_session(struct ceph_mds_client *mdsc, int mds) { - if (mds >= mdsc->max_sessions) + if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) return false; - return mdsc->sessions[mds]; + else + return true; } static int __verify_registered_session(struct ceph_mds_client *mdsc, @@ -448,6 +469,25 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, s = kzalloc(sizeof(*s), GFP_NOFS); if (!s) return ERR_PTR(-ENOMEM); + + if (mds >= mdsc->max_sessions) { + int newmax = 1 << get_count_order(mds + 1); + struct ceph_mds_session **sa; + + dout("%s: realloc to %d\n", __func__, newmax); + sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); + if (!sa) + goto fail_realloc; + if (mdsc->sessions) { + memcpy(sa, mdsc->sessions, + mdsc->max_sessions * sizeof(void *)); + kfree(mdsc->sessions); + } + mdsc->sessions = sa; + mdsc->max_sessions = newmax; + } + + dout("%s: mds%d\n", __func__, mds); s->s_mdsc = mdsc; s->s_mds = mds; s->s_state = CEPH_MDS_SESSION_NEW; @@ -476,23 +516,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, INIT_LIST_HEAD(&s->s_cap_releases); INIT_LIST_HEAD(&s->s_cap_flushing); - dout("register_session mds%d\n", mds); - if (mds >= mdsc->max_sessions) { - int newmax = 1 << get_count_order(mds+1); - struct ceph_mds_session **sa; - - dout("register_session realloc to %d\n", newmax); - sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); - if (!sa) - goto fail_realloc; - if (mdsc->sessions) { - memcpy(sa, mdsc->sessions, - mdsc->max_sessions * sizeof(void *)); - kfree(mdsc->sessions); - } - mdsc->sessions = sa; - mdsc->max_sessions = newmax; - } mdsc->sessions[mds] = s; atomic_inc(&mdsc->num_sessions); refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ @@ -2531,10 +2554,10 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) * Otherwise we just have to return an ESTALE */ if (result == -ESTALE) { - dout("got ESTALE on request %llu", req->r_tid); + dout("got ESTALE on request %llu\n", req->r_tid); req->r_resend_mds = -1; if (req->r_direct_mode != USE_AUTH_MDS) { - dout("not using auth, setting for that now"); + dout("not using auth, setting for that now\n"); req->r_direct_mode = USE_AUTH_MDS; __do_request(mdsc, req); mutex_unlock(&mdsc->mutex); @@ -2542,13 +2565,13 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) } else { int mds = __choose_mds(mdsc, req); if (mds >= 0 && mds != req->r_session->s_mds) { - dout("but auth changed, so resending"); + dout("but auth changed, so resending\n"); __do_request(mdsc, req); mutex_unlock(&mdsc->mutex); goto out; } } - dout("have to return ESTALE on request %llu", req->r_tid); + dout("have to return ESTALE on request %llu\n", req->r_tid); } @@ -3470,13 +3493,12 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, } /* - * drop all leases (and dentry refs) in preparation for umount + * lock unlock sessions, to wait ongoing session activities */ -static void drop_leases(struct ceph_mds_client *mdsc) +static void lock_unlock_sessions(struct ceph_mds_client *mdsc) { int i; - dout("drop_leases\n"); mutex_lock(&mdsc->mutex); for (i = 0; i < mdsc->max_sessions; i++) { struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); @@ -3572,7 +3594,6 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) if (!mdsc) return -ENOMEM; mdsc->fsc = fsc; - fsc->mdsc = mdsc; mutex_init(&mdsc->mutex); mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); if (!mdsc->mdsmap) { @@ -3580,6 +3601,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) return -ENOMEM; } + fsc->mdsc = mdsc; init_completion(&mdsc->safe_umount_waiters); init_waitqueue_head(&mdsc->session_close_wq); INIT_LIST_HEAD(&mdsc->waiting_for_map); @@ -3587,6 +3609,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) atomic_set(&mdsc->num_sessions, 0); mdsc->max_sessions = 0; mdsc->stopping = 0; + atomic64_set(&mdsc->quotarealms_count, 0); mdsc->last_snap_seq = 0; init_rwsem(&mdsc->snap_rwsem); mdsc->snap_realms = RB_ROOT; @@ -3660,7 +3683,7 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) dout("pre_umount\n"); mdsc->stopping = 1; - drop_leases(mdsc); + lock_unlock_sessions(mdsc); ceph_flush_dirty_caps(mdsc); wait_requests(mdsc); @@ -3858,6 +3881,9 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc) struct ceph_mds_client *mdsc = fsc->mdsc; dout("mdsc_destroy %p\n", mdsc); + if (!mdsc) + return; + /* flush out any connection work with references to us */ ceph_msgr_flush(); @@ -4077,6 +4103,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) case CEPH_MSG_CLIENT_LEASE: handle_lease(mdsc, s, msg); break; + case CEPH_MSG_CLIENT_QUOTA: + ceph_handle_quota(mdsc, s, msg); + break; default: pr_err("received unknown message type %d %s\n", type, diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 71e3b783ee6f..2ec3b5b35067 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -49,6 +49,8 @@ struct ceph_mds_reply_info_in { char *inline_data; u32 pool_ns_len; char *pool_ns_data; + u64 max_bytes; + u64 max_files; }; struct ceph_mds_reply_dir_entry { @@ -312,6 +314,8 @@ struct ceph_mds_client { int max_sessions; /* len of s_mds_sessions */ int stopping; /* true if shutting down */ + atomic64_t quotarealms_count; /* # realms with quota */ + /* * snap_rwsem will cover cap linkage into snaprealms, and * realm snap contexts. (later, we can do per-realm snap diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c new file mode 100644 index 000000000000..242bfa5c0539 --- /dev/null +++ b/fs/ceph/quota.c @@ -0,0 +1,361 @@ +// SPDX-License-Identifier: GPL-2.0 +/* + * quota.c - CephFS quota + * + * Copyright (C) 2017-2018 SUSE + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, see <http://www.gnu.org/licenses/>. + */ + +#include <linux/statfs.h> + +#include "super.h" +#include "mds_client.h" + +void ceph_adjust_quota_realms_count(struct inode *inode, bool inc) +{ + struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; + if (inc) + atomic64_inc(&mdsc->quotarealms_count); + else + atomic64_dec(&mdsc->quotarealms_count); +} + +static inline bool ceph_has_realms_with_quotas(struct inode *inode) +{ + struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; + return atomic64_read(&mdsc->quotarealms_count) > 0; +} + +void ceph_handle_quota(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, + struct ceph_msg *msg) +{ + struct super_block *sb = mdsc->fsc->sb; + struct ceph_mds_quota *h = msg->front.iov_base; + struct ceph_vino vino; + struct inode *inode; + struct ceph_inode_info *ci; + + if (msg->front.iov_len != sizeof(*h)) { + pr_err("%s corrupt message mds%d len %d\n", __func__, + session->s_mds, (int)msg->front.iov_len); + ceph_msg_dump(msg); + return; + } + + /* increment msg sequence number */ + mutex_lock(&session->s_mutex); + session->s_seq++; + mutex_unlock(&session->s_mutex); + + /* lookup inode */ + vino.ino = le64_to_cpu(h->ino); + vino.snap = CEPH_NOSNAP; + inode = ceph_find_inode(sb, vino); + if (!inode) { + pr_warn("Failed to find inode %llu\n", vino.ino); + return; + } + ci = ceph_inode(inode); + + spin_lock(&ci->i_ceph_lock); + ci->i_rbytes = le64_to_cpu(h->rbytes); + ci->i_rfiles = le64_to_cpu(h->rfiles); + ci->i_rsubdirs = le64_to_cpu(h->rsubdirs); + __ceph_update_quota(ci, le64_to_cpu(h->max_bytes), + le64_to_cpu(h->max_files)); + spin_unlock(&ci->i_ceph_lock); + + iput(inode); +} + +/* + * This function walks through the snaprealm for an inode and returns the + * ceph_snap_realm for the first snaprealm that has quotas set (either max_files + * or max_bytes). If the root is reached, return the root ceph_snap_realm + * instead. + * + * Note that the caller is responsible for calling ceph_put_snap_realm() on the + * returned realm. + */ +static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc, + struct inode *inode) +{ + struct ceph_inode_info *ci = NULL; + struct ceph_snap_realm *realm, *next; + struct inode *in; + bool has_quota; + + if (ceph_snap(inode) != CEPH_NOSNAP) + return NULL; + + realm = ceph_inode(inode)->i_snap_realm; + if (realm) + ceph_get_snap_realm(mdsc, realm); + else + pr_err_ratelimited("get_quota_realm: ino (%llx.%llx) " + "null i_snap_realm\n", ceph_vinop(inode)); + while (realm) { + spin_lock(&realm->inodes_with_caps_lock); + in = realm->inode ? igrab(realm->inode) : NULL; + spin_unlock(&realm->inodes_with_caps_lock); + if (!in) + break; + + ci = ceph_inode(in); + has_quota = __ceph_has_any_quota(ci); + iput(in); + + next = realm->parent; + if (has_quota || !next) + return realm; + + ceph_get_snap_realm(mdsc, next); + ceph_put_snap_realm(mdsc, realm); + realm = next; + } + if (realm) + ceph_put_snap_realm(mdsc, realm); + + return NULL; +} + +bool ceph_quota_is_same_realm(struct inode *old, struct inode *new) +{ + struct ceph_mds_client *mdsc = ceph_inode_to_client(old)->mdsc; + struct ceph_snap_realm *old_realm, *new_realm; + bool is_same; + + down_read(&mdsc->snap_rwsem); + old_realm = get_quota_realm(mdsc, old); + new_realm = get_quota_realm(mdsc, new); + is_same = (old_realm == new_realm); + up_read(&mdsc->snap_rwsem); + + if (old_realm) + ceph_put_snap_realm(mdsc, old_realm); + if (new_realm) + ceph_put_snap_realm(mdsc, new_realm); + + return is_same; +} + +enum quota_check_op { + QUOTA_CHECK_MAX_FILES_OP, /* check quota max_files limit */ + QUOTA_CHECK_MAX_BYTES_OP, /* check quota max_files limit */ + QUOTA_CHECK_MAX_BYTES_APPROACHING_OP /* check if quota max_files + limit is approaching */ +}; + +/* + * check_quota_exceeded() will walk up the snaprealm hierarchy and, for each + * realm, it will execute quota check operation defined by the 'op' parameter. + * The snaprealm walk is interrupted if the quota check detects that the quota + * is exceeded or if the root inode is reached. + */ +static bool check_quota_exceeded(struct inode *inode, enum quota_check_op op, + loff_t delta) +{ + struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; + struct ceph_inode_info *ci; + struct ceph_snap_realm *realm, *next; + struct inode *in; + u64 max, rvalue; + bool exceeded = false; + + if (ceph_snap(inode) != CEPH_NOSNAP) + return false; + + down_read(&mdsc->snap_rwsem); + realm = ceph_inode(inode)->i_snap_realm; + if (realm) + ceph_get_snap_realm(mdsc, realm); + else + pr_err_ratelimited("check_quota_exceeded: ino (%llx.%llx) " + "null i_snap_realm\n", ceph_vinop(inode)); + while (realm) { + spin_lock(&realm->inodes_with_caps_lock); + in = realm->inode ? igrab(realm->inode) : NULL; + spin_unlock(&realm->inodes_with_caps_lock); + if (!in) + break; + + ci = ceph_inode(in); + spin_lock(&ci->i_ceph_lock); + if (op == QUOTA_CHECK_MAX_FILES_OP) { + max = ci->i_max_files; + rvalue = ci->i_rfiles + ci->i_rsubdirs; + } else { + max = ci->i_max_bytes; + rvalue = ci->i_rbytes; + } + spin_unlock(&ci->i_ceph_lock); + switch (op) { + case QUOTA_CHECK_MAX_FILES_OP: + exceeded = (max && (rvalue >= max)); + break; + case QUOTA_CHECK_MAX_BYTES_OP: + exceeded = (max && (rvalue + delta > max)); + break; + case QUOTA_CHECK_MAX_BYTES_APPROACHING_OP: + if (max) { + if (rvalue >= max) + exceeded = true; + else { + /* + * when we're writing more that 1/16th + * of the available space + */ + exceeded = + (((max - rvalue) >> 4) < delta); + } + } + break; + default: + /* Shouldn't happen */ + pr_warn("Invalid quota check op (%d)\n", op); + exceeded = true; /* Just break the loop */ + } + iput(in); + + next = realm->parent; + if (exceeded || !next) + break; + ceph_get_snap_realm(mdsc, next); + ceph_put_snap_realm(mdsc, realm); + realm = next; + } + ceph_put_snap_realm(mdsc, realm); + up_read(&mdsc->snap_rwsem); + + return exceeded; +} + +/* + * ceph_quota_is_max_files_exceeded - check if we can create a new file + * @inode: directory where a new file is being created + * + * This functions returns true is max_files quota allows a new file to be + * created. It is necessary to walk through the snaprealm hierarchy (until the + * FS root) to check all realms with quotas set. + */ +bool ceph_quota_is_max_files_exceeded(struct inode *inode) +{ + if (!ceph_has_realms_with_quotas(inode)) + return false; + + WARN_ON(!S_ISDIR(inode->i_mode)); + + return check_quota_exceeded(inode, QUOTA_CHECK_MAX_FILES_OP, 0); +} + +/* + * ceph_quota_is_max_bytes_exceeded - check if we can write to a file + * @inode: inode being written + * @newsize: new size if write succeeds + * + * This functions returns true is max_bytes quota allows a file size to reach + * @newsize; it returns false otherwise. + */ +bool ceph_quota_is_max_bytes_exceeded(struct inode *inode, loff_t newsize) +{ + loff_t size = i_size_read(inode); + + if (!ceph_has_realms_with_quotas(inode)) + return false; + + /* return immediately if we're decreasing file size */ + if (newsize <= size) + return false; + + return check_quota_exceeded(inode, QUOTA_CHECK_MAX_BYTES_OP, (newsize - size)); +} + +/* + * ceph_quota_is_max_bytes_approaching - check if we're reaching max_bytes + * @inode: inode being written + * @newsize: new size if write succeeds + * + * This function returns true if the new file size @newsize will be consuming + * more than 1/16th of the available quota space; it returns false otherwise. + */ +bool ceph_quota_is_max_bytes_approaching(struct inode *inode, loff_t newsize) +{ + loff_t size = ceph_inode(inode)->i_reported_size; + + if (!ceph_has_realms_with_quotas(inode)) + return false; + + /* return immediately if we're decreasing file size */ + if (newsize <= size) + return false; + + return check_quota_exceeded(inode, QUOTA_CHECK_MAX_BYTES_APPROACHING_OP, + (newsize - size)); +} + +/* + * ceph_quota_update_statfs - if root has quota update statfs with quota status + * @fsc: filesystem client instance + * @buf: statfs to update + * + * If the mounted filesystem root has max_bytes quota set, update the filesystem + * statistics with the quota status. + * + * This function returns true if the stats have been updated, false otherwise. + */ +bool ceph_quota_update_statfs(struct ceph_fs_client *fsc, struct kstatfs *buf) +{ + struct ceph_mds_client *mdsc = fsc->mdsc; + struct ceph_inode_info *ci; + struct ceph_snap_realm *realm; + struct inode *in; + u64 total = 0, used, free; + bool is_updated = false; + + down_read(&mdsc->snap_rwsem); + realm = get_quota_realm(mdsc, d_inode(fsc->sb->s_root)); + up_read(&mdsc->snap_rwsem); + if (!realm) + return false; + + spin_lock(&realm->inodes_with_caps_lock); + in = realm->inode ? igrab(realm->inode) : NULL; + spin_unlock(&realm->inodes_with_caps_lock); + if (in) { + ci = ceph_inode(in); + spin_lock(&ci->i_ceph_lock); + if (ci->i_max_bytes) { + total = ci->i_max_bytes >> CEPH_BLOCK_SHIFT; + used = ci->i_rbytes >> CEPH_BLOCK_SHIFT; + /* It is possible for a quota to be exceeded. + * Report 'zero' in that case + */ + free = total > used ? total - used : 0; + } + spin_unlock(&ci->i_ceph_lock); + if (total) { + buf->f_blocks = total; + buf->f_bfree = free; + buf->f_bavail = free; + is_updated = true; + } + iput(in); + } + ceph_put_snap_realm(mdsc, realm); + + return is_updated; +} + diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index 07cf95e6413d..041c27ea8de1 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -931,6 +931,8 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, list_add(&ci->i_snap_realm_item, &realm->inodes_with_caps); ci->i_snap_realm = realm; + if (realm->ino == ci->i_vino.ino) + realm->inode = inode; spin_unlock(&realm->inodes_with_caps_lock); spin_unlock(&ci->i_ceph_lock); diff --git a/fs/ceph/super.c b/fs/ceph/super.c index fb2bc9c15a23..b33082e6878f 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -76,9 +76,18 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf) */ buf->f_bsize = 1 << CEPH_BLOCK_SHIFT; buf->f_frsize = 1 << CEPH_BLOCK_SHIFT; - buf->f_blocks = le64_to_cpu(st.kb) >> (CEPH_BLOCK_SHIFT-10); - buf->f_bfree = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10); - buf->f_bavail = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10); + + /* + * By default use root quota for stats; fallback to overall filesystem + * usage if using 'noquotadf' mount option or if the root dir doesn't + * have max_bytes quota set. + */ + if (ceph_test_mount_opt(fsc, NOQUOTADF) || + !ceph_quota_update_statfs(fsc, buf)) { + buf->f_blocks = le64_to_cpu(st.kb) >> (CEPH_BLOCK_SHIFT-10); + buf->f_bfree = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10); + buf->f_bavail = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10); + } buf->f_files = le64_to_cpu(st.num_objects); buf->f_ffree = -1; @@ -151,6 +160,8 @@ enum { Opt_acl, #endif Opt_noacl, + Opt_quotadf, + Opt_noquotadf, }; static match_table_t fsopt_tokens = { @@ -187,6 +198,8 @@ static match_table_t fsopt_tokens = { {Opt_acl, "acl"}, #endif {Opt_noacl, "noacl"}, + {Opt_quotadf, "quotadf"}, + {Opt_noquotadf, "noquotadf"}, {-1, NULL} }; @@ -314,13 +327,16 @@ static int parse_fsopt_token(char *c, void *private) break; case Opt_fscache: fsopt->flags |= CEPH_MOUNT_OPT_FSCACHE; + kfree(fsopt->fscache_uniq); + fsopt->fscache_uniq = NULL; break; case Opt_nofscache: fsopt->flags &= ~CEPH_MOUNT_OPT_FSCACHE; + kfree(fsopt->fscache_uniq); + fsopt->fscache_uniq = NULL; break; case Opt_poolperm: fsopt->flags &= ~CEPH_MOUNT_OPT_NOPOOLPERM; - printk ("pool perm"); break; case Opt_nopoolperm: fsopt->flags |= CEPH_MOUNT_OPT_NOPOOLPERM; @@ -331,6 +347,12 @@ static int parse_fsopt_token(char *c, void *private) case Opt_norequire_active_mds: fsopt->flags |= CEPH_MOUNT_OPT_MOUNTWAIT; break; + case Opt_quotadf: + fsopt->flags &= ~CEPH_MOUNT_OPT_NOQUOTADF; + break; + case Opt_noquotadf: + fsopt->flags |= CEPH_MOUNT_OPT_NOQUOTADF; + break; #ifdef CONFIG_CEPH_FS_POSIX_ACL case Opt_acl: fsopt->sb_flags |= SB_POSIXACL; @@ -513,13 +535,12 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root) if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0) seq_puts(m, ",nodcache"); if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) { - if (fsopt->fscache_uniq) - seq_printf(m, ",fsc=%s", fsopt->fscache_uniq); - else - seq_puts(m, ",fsc"); + seq_show_option(m, "fsc", fsopt->fscache_uniq); } if (fsopt->flags & CEPH_MOUNT_OPT_NOPOOLPERM) seq_puts(m, ",nopoolperm"); + if (fsopt->flags & CEPH_MOUNT_OPT_NOQUOTADF) + seq_puts(m, ",noquotadf"); #ifdef CONFIG_CEPH_FS_POSIX_ACL if (fsopt->sb_flags & SB_POSIXACL) @@ -529,7 +550,7 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root) #endif if (fsopt->mds_namespace) - seq_printf(m, ",mds_namespace=%s", fsopt->mds_namespace); + seq_show_option(m, "mds_namespace", fsopt->mds_namespace); if (fsopt->wsize) seq_printf(m, ",wsize=%d", fsopt->wsize); if (fsopt->rsize != CEPH_MAX_READ_SIZE) @@ -679,6 +700,7 @@ struct kmem_cache *ceph_cap_cachep; struct kmem_cache *ceph_cap_flush_cachep; struct kmem_cache *ceph_dentry_cachep; struct kmem_cache *ceph_file_cachep; +struct kmem_cache *ceph_dir_file_cachep; static void ceph_inode_init_once(void *foo) { @@ -698,8 +720,7 @@ static int __init init_caches(void) if (!ceph_inode_cachep) return -ENOMEM; - ceph_cap_cachep = KMEM_CACHE(ceph_cap, - SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD); + ceph_cap_cachep = KMEM_CACHE(ceph_cap, SLAB_MEM_SPREAD); if (!ceph_cap_cachep) goto bad_cap; ceph_cap_flush_cachep = KMEM_CACHE(ceph_cap_flush, @@ -716,6 +737,10 @@ static int __init init_caches(void) if (!ceph_file_cachep) goto bad_file; + ceph_dir_file_cachep = KMEM_CACHE(ceph_dir_file_info, SLAB_MEM_SPREAD); + if (!ceph_dir_file_cachep) + goto bad_dir_file; + error = ceph_fscache_register(); if (error) goto bad_fscache; @@ -723,6 +748,8 @@ static int __init init_caches(void) return 0; bad_fscache: + kmem_cache_destroy(ceph_dir_file_cachep); +bad_dir_file: kmem_cache_destroy(ceph_file_cachep); bad_file: kmem_cache_destroy(ceph_dentry_cachep); @@ -748,6 +775,7 @@ static void destroy_caches(void) kmem_cache_destroy(ceph_cap_flush_cachep); kmem_cache_destroy(ceph_dentry_cachep); kmem_cache_destroy(ceph_file_cachep); + kmem_cache_destroy(ceph_dir_file_cachep); ceph_fscache_unregister(); } diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 1c2086e0fec2..a7077a0c989f 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -39,6 +39,7 @@ #define CEPH_MOUNT_OPT_FSCACHE (1<<10) /* use fscache */ #define CEPH_MOUNT_OPT_NOPOOLPERM (1<<11) /* no pool permission check */ #define CEPH_MOUNT_OPT_MOUNTWAIT (1<<12) /* mount waits if no mds is up */ +#define CEPH_MOUNT_OPT_NOQUOTADF (1<<13) /* no root dir quota in statfs */ #define CEPH_MOUNT_OPT_DEFAULT CEPH_MOUNT_OPT_DCACHE @@ -310,6 +311,9 @@ struct ceph_inode_info { u64 i_rbytes, i_rfiles, i_rsubdirs; u64 i_files, i_subdirs; + /* quotas */ + u64 i_max_bytes, i_max_files; + struct rb_root i_fragtree; int i_fragtree_nsplits; struct mutex i_fragtree_mutex; @@ -671,6 +675,10 @@ struct ceph_file_info { spinlock_t rw_contexts_lock; struct list_head rw_contexts; +}; + +struct ceph_dir_file_info { + struct ceph_file_info file_info; /* readdir: position within the dir */ u32 frag; @@ -748,6 +756,7 @@ struct ceph_readdir_cache_control { */ struct ceph_snap_realm { u64 ino; + struct inode *inode; atomic_t nref; struct rb_node node; @@ -1066,4 +1075,37 @@ extern int ceph_locks_to_pagelist(struct ceph_filelock *flocks, extern int ceph_fs_debugfs_init(struct ceph_fs_client *client); extern void ceph_fs_debugfs_cleanup(struct ceph_fs_client *client); +/* quota.c */ +static inline bool __ceph_has_any_quota(struct ceph_inode_info *ci) +{ + return ci->i_max_files || ci->i_max_bytes; +} + +extern void ceph_adjust_quota_realms_count(struct inode *inode, bool inc); + +static inline void __ceph_update_quota(struct ceph_inode_info *ci, + u64 max_bytes, u64 max_files) +{ + bool had_quota, has_quota; + had_quota = __ceph_has_any_quota(ci); + ci->i_max_bytes = max_bytes; + ci->i_max_files = max_files; + has_quota = __ceph_has_any_quota(ci); + + if (had_quota != has_quota) + ceph_adjust_quota_realms_count(&ci->vfs_inode, has_quota); +} + +extern void ceph_handle_quota(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, + struct ceph_msg *msg); +extern bool ceph_quota_is_max_files_exceeded(struct inode *inode); +extern bool ceph_quota_is_same_realm(struct inode *old, struct inode *new); +extern bool ceph_quota_is_max_bytes_exceeded(struct inode *inode, + loff_t newlen); +extern bool ceph_quota_is_max_bytes_approaching(struct inode *inode, + loff_t newlen); +extern bool ceph_quota_update_statfs(struct ceph_fs_client *fsc, + struct kstatfs *buf); + #endif /* _FS_CEPH_SUPER_H */ diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index e1c4e0b12b4c..7e72348639e4 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -224,6 +224,31 @@ static size_t ceph_vxattrcb_dir_rctime(struct ceph_inode_info *ci, char *val, (long)ci->i_rctime.tv_nsec); } +/* quotas */ + +static bool ceph_vxattrcb_quota_exists(struct ceph_inode_info *ci) +{ + return (ci->i_max_files || ci->i_max_bytes); +} + +static size_t ceph_vxattrcb_quota(struct ceph_inode_info *ci, char *val, + size_t size) +{ + return snprintf(val, size, "max_bytes=%llu max_files=%llu", + ci->i_max_bytes, ci->i_max_files); +} + +static size_t ceph_vxattrcb_quota_max_bytes(struct ceph_inode_info *ci, + char *val, size_t size) +{ + return snprintf(val, size, "%llu", ci->i_max_bytes); +} + +static size_t ceph_vxattrcb_quota_max_files(struct ceph_inode_info *ci, + char *val, size_t size) +{ + return snprintf(val, size, "%llu", ci->i_max_files); +} #define CEPH_XATTR_NAME(_type, _name) XATTR_CEPH_PREFIX #_type "." #_name #define CEPH_XATTR_NAME2(_type, _name, _name2) \ @@ -247,6 +272,15 @@ static size_t ceph_vxattrcb_dir_rctime(struct ceph_inode_info *ci, char *val, .hidden = true, \ .exists_cb = ceph_vxattrcb_layout_exists, \ } +#define XATTR_QUOTA_FIELD(_type, _name) \ + { \ + .name = CEPH_XATTR_NAME(_type, _name), \ + .name_size = sizeof(CEPH_XATTR_NAME(_type, _name)), \ + .getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \ + .readonly = false, \ + .hidden = true, \ + .exists_cb = ceph_vxattrcb_quota_exists, \ + } static struct ceph_vxattr ceph_dir_vxattrs[] = { { @@ -270,6 +304,16 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = { XATTR_NAME_CEPH(dir, rsubdirs), XATTR_NAME_CEPH(dir, rbytes), XATTR_NAME_CEPH(dir, rctime), + { + .name = "ceph.quota", + .name_size = sizeof("ceph.quota"), + .getxattr_cb = ceph_vxattrcb_quota, + .readonly = false, + .hidden = true, + .exists_cb = ceph_vxattrcb_quota_exists, + }, + XATTR_QUOTA_FIELD(quota, max_bytes), + XATTR_QUOTA_FIELD(quota, max_files), { .name = NULL, 0 } /* Required table terminator */ }; static size_t ceph_dir_vxattrs_name_size; /* total size of all names */ diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h index 59042d5ac520..3901927cf6a0 100644 --- a/include/linux/ceph/ceph_features.h +++ b/include/linux/ceph/ceph_features.h @@ -204,6 +204,7 @@ DEFINE_CEPH_FEATURE_DEPRECATED(63, 1, RESERVED_BROKEN, LUMINOUS) // client-facin CEPH_FEATURE_OSD_PRIMARY_AFFINITY | \ CEPH_FEATURE_MSGR_KEEPALIVE2 | \ CEPH_FEATURE_OSD_POOLRESEND | \ + CEPH_FEATURE_MDS_QUOTA | \ CEPH_FEATURE_CRUSH_V4 | \ CEPH_FEATURE_NEW_OSDOP_ENCODING | \ CEPH_FEATURE_SERVER_JEWEL | \ diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 88dd51381aaf..7ecfc88314d8 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -134,6 +134,7 @@ struct ceph_dir_layout { #define CEPH_MSG_CLIENT_LEASE 0x311 #define CEPH_MSG_CLIENT_SNAP 0x312 #define CEPH_MSG_CLIENT_CAPRELEASE 0x313 +#define CEPH_MSG_CLIENT_QUOTA 0x314 /* pool ops */ #define CEPH_MSG_POOLOP_REPLY 48 @@ -807,4 +808,20 @@ struct ceph_mds_snap_realm { } __attribute__ ((packed)); /* followed by my snap list, then prior parent snap list */ +/* + * quotas + */ +struct ceph_mds_quota { + __le64 ino; /* ino */ + struct ceph_timespec rctime; + __le64 rbytes; /* dir stats */ + __le64 rfiles; + __le64 rsubdirs; + __u8 struct_v; /* compat */ + __u8 struct_compat; + __le32 struct_len; + __le64 max_bytes; /* quota max. bytes */ + __le64 max_files; /* quota max. files */ +} __attribute__ ((packed)); + #endif diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index c2ec44cf5098..49c93b9308d7 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -262,6 +262,7 @@ extern struct kmem_cache *ceph_cap_cachep; extern struct kmem_cache *ceph_cap_flush_cachep; extern struct kmem_cache *ceph_dentry_cachep; extern struct kmem_cache *ceph_file_cachep; +extern struct kmem_cache *ceph_dir_file_cachep; /* ceph_common.c */ extern bool libceph_compatible(void *data); diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index ead9d85f1c11..c7dfcb8a1fb2 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -76,6 +76,7 @@ enum ceph_msg_data_type { #ifdef CONFIG_BLOCK CEPH_MSG_DATA_BIO, /* data source/destination is a bio list */ #endif /* CONFIG_BLOCK */ + CEPH_MSG_DATA_BVECS, /* data source/destination is a bio_vec array */ }; static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type) @@ -87,22 +88,106 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type) #ifdef CONFIG_BLOCK case CEPH_MSG_DATA_BIO: #endif /* CONFIG_BLOCK */ + case CEPH_MSG_DATA_BVECS: return true; default: return false; } } +#ifdef CONFIG_BLOCK + +struct ceph_bio_iter { + struct bio *bio; + struct bvec_iter iter; +}; + +#define __ceph_bio_iter_advance_step(it, n, STEP) do { \ + unsigned int __n = (n), __cur_n; \ + \ + while (__n) { \ + BUG_ON(!(it)->iter.bi_size); \ + __cur_n = min((it)->iter.bi_size, __n); \ + (void)(STEP); \ + bio_advance_iter((it)->bio, &(it)->iter, __cur_n); \ + if (!(it)->iter.bi_size && (it)->bio->bi_next) { \ + dout("__ceph_bio_iter_advance_step next bio\n"); \ + (it)->bio = (it)->bio->bi_next; \ + (it)->iter = (it)->bio->bi_iter; \ + } \ + __n -= __cur_n; \ + } \ +} while (0) + +/* + * Advance @it by @n bytes. + */ +#define ceph_bio_iter_advance(it, n) \ + __ceph_bio_iter_advance_step(it, n, 0) + +/* + * Advance @it by @n bytes, executing BVEC_STEP for each bio_vec. + */ +#define ceph_bio_iter_advance_step(it, n, BVEC_STEP) \ + __ceph_bio_iter_advance_step(it, n, ({ \ + struct bio_vec bv; \ + struct bvec_iter __cur_iter; \ + \ + __cur_iter = (it)->iter; \ + __cur_iter.bi_size = __cur_n; \ + __bio_for_each_segment(bv, (it)->bio, __cur_iter, __cur_iter) \ + (void)(BVEC_STEP); \ + })) + +#endif /* CONFIG_BLOCK */ + +struct ceph_bvec_iter { + struct bio_vec *bvecs; + struct bvec_iter iter; +}; + +#define __ceph_bvec_iter_advance_step(it, n, STEP) do { \ + BUG_ON((n) > (it)->iter.bi_size); \ + (void)(STEP); \ + bvec_iter_advance((it)->bvecs, &(it)->iter, (n)); \ +} while (0) + +/* + * Advance @it by @n bytes. + */ +#define ceph_bvec_iter_advance(it, n) \ + __ceph_bvec_iter_advance_step(it, n, 0) + +/* + * Advance @it by @n bytes, executing BVEC_STEP for each bio_vec. + */ +#define ceph_bvec_iter_advance_step(it, n, BVEC_STEP) \ + __ceph_bvec_iter_advance_step(it, n, ({ \ + struct bio_vec bv; \ + struct bvec_iter __cur_iter; \ + \ + __cur_iter = (it)->iter; \ + __cur_iter.bi_size = (n); \ + for_each_bvec(bv, (it)->bvecs, __cur_iter, __cur_iter) \ + (void)(BVEC_STEP); \ + })) + +#define ceph_bvec_iter_shorten(it, n) do { \ + BUG_ON((n) > (it)->iter.bi_size); \ + (it)->iter.bi_size = (n); \ +} while (0) + struct ceph_msg_data { struct list_head links; /* ceph_msg->data */ enum ceph_msg_data_type type; union { #ifdef CONFIG_BLOCK struct { - struct bio *bio; - size_t bio_length; + struct ceph_bio_iter bio_pos; + u32 bio_length; }; #endif /* CONFIG_BLOCK */ + struct ceph_bvec_iter bvec_pos; struct { struct page **pages; /* NOT OWNER. */ size_t length; /* total # bytes */ @@ -122,11 +207,9 @@ struct ceph_msg_data_cursor { bool need_crc; /* crc update needed */ union { #ifdef CONFIG_BLOCK - struct { /* bio */ - struct bio *bio; /* bio from list */ - struct bvec_iter bvec_iter; - }; + struct ceph_bio_iter bio_iter; #endif /* CONFIG_BLOCK */ + struct bvec_iter bvec_iter; struct { /* pages */ unsigned int page_offset; /* offset in page */ unsigned short page_index; /* index in array */ @@ -290,9 +373,11 @@ extern void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages, extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg, struct ceph_pagelist *pagelist); #ifdef CONFIG_BLOCK -extern void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio, - size_t length); +void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos, + u32 length); #endif /* CONFIG_BLOCK */ +void ceph_msg_data_add_bvecs(struct ceph_msg *msg, + struct ceph_bvec_iter *bvec_pos); extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, bool can_fail); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 52fb37d1c2a5..528ccc943cee 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -57,6 +57,7 @@ enum ceph_osd_data_type { #ifdef CONFIG_BLOCK CEPH_OSD_DATA_TYPE_BIO, #endif /* CONFIG_BLOCK */ + CEPH_OSD_DATA_TYPE_BVECS, }; struct ceph_osd_data { @@ -72,10 +73,11 @@ struct ceph_osd_data { struct ceph_pagelist *pagelist; #ifdef CONFIG_BLOCK struct { - struct bio *bio; /* list of bios */ - size_t bio_length; /* total in list */ + struct ceph_bio_iter bio_pos; + u32 bio_length; }; #endif /* CONFIG_BLOCK */ + struct ceph_bvec_iter bvec_pos; }; }; @@ -405,10 +407,14 @@ extern void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *, unsigned int which, struct ceph_pagelist *pagelist); #ifdef CONFIG_BLOCK -extern void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *, - unsigned int which, - struct bio *bio, size_t bio_length); +void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, + unsigned int which, + struct ceph_bio_iter *bio_pos, + u32 bio_length); #endif /* CONFIG_BLOCK */ +void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req, + unsigned int which, + struct ceph_bvec_iter *bvec_pos); extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *, unsigned int which, @@ -418,6 +424,9 @@ extern void osd_req_op_cls_request_data_pages(struct ceph_osd_request *, struct page **pages, u64 length, u32 alignment, bool pages_from_pool, bool own_pages); +void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req, + unsigned int which, + struct bio_vec *bvecs, u32 bytes); extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *, unsigned int which, struct page **pages, u64 length, diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index d41fad99c0fa..e71fb222c7c3 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -5,7 +5,6 @@ #include <linux/rbtree.h> #include <linux/ceph/types.h> #include <linux/ceph/decode.h> -#include <linux/ceph/ceph_fs.h> #include <linux/crush/crush.h> /* @@ -280,11 +279,6 @@ bool ceph_osds_changed(const struct ceph_osds *old_acting, const struct ceph_osds *new_acting, bool any_change); -/* calculate mapping of a file extent to an object */ -extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, - u64 off, u64 len, - u64 *bno, u64 *oxoff, u64 *oxlen); - int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, const struct ceph_object_id *oid, const struct ceph_object_locator *oloc, diff --git a/include/linux/ceph/striper.h b/include/linux/ceph/striper.h new file mode 100644 index 000000000000..cbd0d24b7148 --- /dev/null +++ b/include/linux/ceph/striper.h @@ -0,0 +1,69 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +#ifndef _LINUX_CEPH_STRIPER_H +#define _LINUX_CEPH_STRIPER_H + +#include <linux/list.h> +#include <linux/types.h> + +struct ceph_file_layout; + +void ceph_calc_file_object_mapping(struct ceph_file_layout *l, + u64 off, u64 len, + u64 *objno, u64 *objoff, u32 *xlen); + +struct ceph_object_extent { + struct list_head oe_item; + u64 oe_objno; + u64 oe_off; + u64 oe_len; +}; + +static inline void ceph_object_extent_init(struct ceph_object_extent *ex) +{ + INIT_LIST_HEAD(&ex->oe_item); +} + +/* + * Called for each mapped stripe unit. + * + * @bytes: number of bytes mapped, i.e. the minimum of the full length + * requested (file extent length) or the remainder of the stripe + * unit within an object + */ +typedef void (*ceph_object_extent_fn_t)(struct ceph_object_extent *ex, + u32 bytes, void *arg); + +int ceph_file_to_extents(struct ceph_file_layout *l, u64 off, u64 len, + struct list_head *object_extents, + struct ceph_object_extent *alloc_fn(void *arg), + void *alloc_arg, + ceph_object_extent_fn_t action_fn, + void *action_arg); +int ceph_iterate_extents(struct ceph_file_layout *l, u64 off, u64 len, + struct list_head *object_extents, + ceph_object_extent_fn_t action_fn, + void *action_arg); + +struct ceph_file_extent { + u64 fe_off; + u64 fe_len; +}; + +static inline u64 ceph_file_extents_bytes(struct ceph_file_extent *file_extents, + u32 num_file_extents) +{ + u64 bytes = 0; + u32 i; + + for (i = 0; i < num_file_extents; i++) + bytes += file_extents[i].fe_len; + + return bytes; +} + +int ceph_extent_to_file(struct ceph_file_layout *l, + u64 objno, u64 objoff, u64 objlen, + struct ceph_file_extent **file_extents, + u32 *num_file_extents); + +#endif diff --git a/net/ceph/Makefile b/net/ceph/Makefile index b4bded4b5396..12bf49772d24 100644 --- a/net/ceph/Makefile +++ b/net/ceph/Makefile @@ -8,6 +8,7 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \ mon_client.o \ cls_lock_client.o \ osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \ + striper.o \ debugfs.o \ auth.o auth_none.o \ crypto.o armor.o \ diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index 4adf07826f4a..584fdbef2088 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -72,6 +72,7 @@ const char *ceph_msg_type_name(int type) case CEPH_MSG_MON_GET_VERSION: return "mon_get_version"; case CEPH_MSG_MON_GET_VERSION_REPLY: return "mon_get_version_reply"; case CEPH_MSG_MDS_MAP: return "mds_map"; + case CEPH_MSG_FS_MAP_USER: return "fs_map_user"; case CEPH_MSG_CLIENT_SESSION: return "client_session"; case CEPH_MSG_CLIENT_RECONNECT: return "client_reconnect"; case CEPH_MSG_CLIENT_REQUEST: return "client_request"; @@ -79,8 +80,13 @@ const char *ceph_msg_type_name(int type) case CEPH_MSG_CLIENT_REPLY: return "client_reply"; case CEPH_MSG_CLIENT_CAPS: return "client_caps"; case CEPH_MSG_CLIENT_CAPRELEASE: return "client_cap_release"; + case CEPH_MSG_CLIENT_QUOTA: return "client_quota"; case CEPH_MSG_CLIENT_SNAP: return "client_snap"; case CEPH_MSG_CLIENT_LEASE: return "client_lease"; + case CEPH_MSG_POOLOP_REPLY: return "poolop_reply"; + case CEPH_MSG_POOLOP: return "poolop"; + case CEPH_MSG_MON_COMMAND: return "mon_command"; + case CEPH_MSG_MON_COMMAND_ACK: return "mon_command_ack"; case CEPH_MSG_OSD_MAP: return "osd_map"; case CEPH_MSG_OSD_OP: return "osd_op"; case CEPH_MSG_OSD_OPREPLY: return "osd_opreply"; @@ -217,7 +223,7 @@ static int parse_fsid(const char *str, struct ceph_fsid *fsid) if (i == 16) err = 0; - dout("parse_fsid ret %d got fsid %pU", err, fsid); + dout("parse_fsid ret %d got fsid %pU\n", err, fsid); return err; } diff --git a/net/ceph/crypto.c b/net/ceph/crypto.c index bf9d079cbafd..02172c408ff2 100644 --- a/net/ceph/crypto.c +++ b/net/ceph/crypto.c @@ -347,10 +347,12 @@ struct key_type key_type_ceph = { .destroy = ceph_key_destroy, }; -int ceph_crypto_init(void) { +int __init ceph_crypto_init(void) +{ return register_key_type(&key_type_ceph); } -void ceph_crypto_shutdown(void) { +void ceph_crypto_shutdown(void) +{ unregister_key_type(&key_type_ceph); } diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 1eef6806aa1a..02952605d121 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -389,7 +389,7 @@ CEPH_DEFINE_SHOW_FUNC(monc_show) CEPH_DEFINE_SHOW_FUNC(osdc_show) CEPH_DEFINE_SHOW_FUNC(client_options_show) -int ceph_debugfs_init(void) +int __init ceph_debugfs_init(void) { ceph_debugfs_dir = debugfs_create_dir("ceph", NULL); if (!ceph_debugfs_dir) @@ -418,7 +418,7 @@ int ceph_debugfs_client_init(struct ceph_client *client) goto out; client->monc.debugfs_file = debugfs_create_file("monc", - 0600, + 0400, client->debugfs_dir, client, &monc_show_fops); @@ -426,7 +426,7 @@ int ceph_debugfs_client_init(struct ceph_client *client) goto out; client->osdc.debugfs_file = debugfs_create_file("osdc", - 0600, + 0400, client->debugfs_dir, client, &osdc_show_fops); @@ -434,7 +434,7 @@ int ceph_debugfs_client_init(struct ceph_client *client) goto out; client->debugfs_monmap = debugfs_create_file("monmap", - 0600, + 0400, client->debugfs_dir, client, &monmap_show_fops); @@ -442,7 +442,7 @@ int ceph_debugfs_client_init(struct ceph_client *client) goto out; client->debugfs_osdmap = debugfs_create_file("osdmap", - 0600, + 0400, client->debugfs_dir, client, &osdmap_show_fops); @@ -450,7 +450,7 @@ int ceph_debugfs_client_init(struct ceph_client *client) goto out; client->debugfs_options = debugfs_create_file("client_options", - 0600, + 0400, client->debugfs_dir, client, &client_options_show_fops); @@ -477,7 +477,7 @@ void ceph_debugfs_client_cleanup(struct ceph_client *client) #else /* CONFIG_DEBUG_FS */ -int ceph_debugfs_init(void) +int __init ceph_debugfs_init(void) { return 0; } @@ -496,6 +496,3 @@ void ceph_debugfs_client_cleanup(struct ceph_client *client) } #endif /* CONFIG_DEBUG_FS */ - -EXPORT_SYMBOL(ceph_debugfs_init); -EXPORT_SYMBOL(ceph_debugfs_cleanup); diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 8a4d3758030b..fcb40c12b1f8 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -277,7 +277,7 @@ static void _ceph_msgr_exit(void) ceph_msgr_slab_exit(); } -int ceph_msgr_init(void) +int __init ceph_msgr_init(void) { if (ceph_msgr_slab_init()) return -ENOMEM; @@ -299,7 +299,6 @@ int ceph_msgr_init(void) return -ENOMEM; } -EXPORT_SYMBOL(ceph_msgr_init); void ceph_msgr_exit(void) { @@ -307,7 +306,6 @@ void ceph_msgr_exit(void) _ceph_msgr_exit(); } -EXPORT_SYMBOL(ceph_msgr_exit); void ceph_msgr_flush(void) { @@ -839,93 +837,112 @@ static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor, size_t length) { struct ceph_msg_data *data = cursor->data; - struct bio *bio; + struct ceph_bio_iter *it = &cursor->bio_iter; - BUG_ON(data->type != CEPH_MSG_DATA_BIO); + cursor->resid = min_t(size_t, length, data->bio_length); + *it = data->bio_pos; + if (cursor->resid < it->iter.bi_size) + it->iter.bi_size = cursor->resid; - bio = data->bio; - BUG_ON(!bio); - - cursor->resid = min(length, data->bio_length); - cursor->bio = bio; - cursor->bvec_iter = bio->bi_iter; - cursor->last_piece = - cursor->resid <= bio_iter_len(bio, cursor->bvec_iter); + BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter)); + cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter); } static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor, size_t *page_offset, size_t *length) { - struct ceph_msg_data *data = cursor->data; - struct bio *bio; - struct bio_vec bio_vec; - - BUG_ON(data->type != CEPH_MSG_DATA_BIO); - - bio = cursor->bio; - BUG_ON(!bio); - - bio_vec = bio_iter_iovec(bio, cursor->bvec_iter); - - *page_offset = (size_t) bio_vec.bv_offset; - BUG_ON(*page_offset >= PAGE_SIZE); - if (cursor->last_piece) /* pagelist offset is always 0 */ - *length = cursor->resid; - else - *length = (size_t) bio_vec.bv_len; - BUG_ON(*length > cursor->resid); - BUG_ON(*page_offset + *length > PAGE_SIZE); + struct bio_vec bv = bio_iter_iovec(cursor->bio_iter.bio, + cursor->bio_iter.iter); - return bio_vec.bv_page; + *page_offset = bv.bv_offset; + *length = bv.bv_len; + return bv.bv_page; } static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor, size_t bytes) { - struct bio *bio; - struct bio_vec bio_vec; + struct ceph_bio_iter *it = &cursor->bio_iter; - BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO); + BUG_ON(bytes > cursor->resid); + BUG_ON(bytes > bio_iter_len(it->bio, it->iter)); + cursor->resid -= bytes; + bio_advance_iter(it->bio, &it->iter, bytes); - bio = cursor->bio; - BUG_ON(!bio); + if (!cursor->resid) { + BUG_ON(!cursor->last_piece); + return false; /* no more data */ + } - bio_vec = bio_iter_iovec(bio, cursor->bvec_iter); + if (!bytes || (it->iter.bi_size && it->iter.bi_bvec_done)) + return false; /* more bytes to process in this segment */ - /* Advance the cursor offset */ + if (!it->iter.bi_size) { + it->bio = it->bio->bi_next; + it->iter = it->bio->bi_iter; + if (cursor->resid < it->iter.bi_size) + it->iter.bi_size = cursor->resid; + } - BUG_ON(cursor->resid < bytes); - cursor->resid -= bytes; + BUG_ON(cursor->last_piece); + BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter)); + cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter); + return true; +} +#endif /* CONFIG_BLOCK */ - bio_advance_iter(bio, &cursor->bvec_iter, bytes); +static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor *cursor, + size_t length) +{ + struct ceph_msg_data *data = cursor->data; + struct bio_vec *bvecs = data->bvec_pos.bvecs; - if (bytes < bio_vec.bv_len) - return false; /* more bytes to process in this segment */ + cursor->resid = min_t(size_t, length, data->bvec_pos.iter.bi_size); + cursor->bvec_iter = data->bvec_pos.iter; + cursor->bvec_iter.bi_size = cursor->resid; - /* Move on to the next segment, and possibly the next bio */ + BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter)); + cursor->last_piece = + cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter); +} - if (!cursor->bvec_iter.bi_size) { - bio = bio->bi_next; - cursor->bio = bio; - if (bio) - cursor->bvec_iter = bio->bi_iter; - else - memset(&cursor->bvec_iter, 0, - sizeof(cursor->bvec_iter)); - } +static struct page *ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor *cursor, + size_t *page_offset, + size_t *length) +{ + struct bio_vec bv = bvec_iter_bvec(cursor->data->bvec_pos.bvecs, + cursor->bvec_iter); + + *page_offset = bv.bv_offset; + *length = bv.bv_len; + return bv.bv_page; +} + +static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor *cursor, + size_t bytes) +{ + struct bio_vec *bvecs = cursor->data->bvec_pos.bvecs; + + BUG_ON(bytes > cursor->resid); + BUG_ON(bytes > bvec_iter_len(bvecs, cursor->bvec_iter)); + cursor->resid -= bytes; + bvec_iter_advance(bvecs, &cursor->bvec_iter, bytes); - if (!cursor->last_piece) { - BUG_ON(!cursor->resid); - BUG_ON(!bio); - /* A short read is OK, so use <= rather than == */ - if (cursor->resid <= bio_iter_len(bio, cursor->bvec_iter)) - cursor->last_piece = true; + if (!cursor->resid) { + BUG_ON(!cursor->last_piece); + return false; /* no more data */ } + if (!bytes || cursor->bvec_iter.bi_bvec_done) + return false; /* more bytes to process in this segment */ + + BUG_ON(cursor->last_piece); + BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter)); + cursor->last_piece = + cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter); return true; } -#endif /* CONFIG_BLOCK */ /* * For a page array, a piece comes from the first page in the array @@ -1110,6 +1127,9 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor) ceph_msg_data_bio_cursor_init(cursor, length); break; #endif /* CONFIG_BLOCK */ + case CEPH_MSG_DATA_BVECS: + ceph_msg_data_bvecs_cursor_init(cursor, length); + break; case CEPH_MSG_DATA_NONE: default: /* BUG(); */ @@ -1158,14 +1178,19 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, page = ceph_msg_data_bio_next(cursor, page_offset, length); break; #endif /* CONFIG_BLOCK */ + case CEPH_MSG_DATA_BVECS: + page = ceph_msg_data_bvecs_next(cursor, page_offset, length); + break; case CEPH_MSG_DATA_NONE: default: page = NULL; break; } + BUG_ON(!page); BUG_ON(*page_offset + *length > PAGE_SIZE); BUG_ON(!*length); + BUG_ON(*length > cursor->resid); if (last_piece) *last_piece = cursor->last_piece; @@ -1194,6 +1219,9 @@ static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, new_piece = ceph_msg_data_bio_advance(cursor, bytes); break; #endif /* CONFIG_BLOCK */ + case CEPH_MSG_DATA_BVECS: + new_piece = ceph_msg_data_bvecs_advance(cursor, bytes); + break; case CEPH_MSG_DATA_NONE: default: BUG(); @@ -1575,13 +1603,18 @@ static int write_partial_message_data(struct ceph_connection *con) * been revoked, so use the zero page. */ crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0; - while (cursor->resid) { + while (cursor->total_resid) { struct page *page; size_t page_offset; size_t length; bool last_piece; int ret; + if (!cursor->resid) { + ceph_msg_data_advance(cursor, 0); + continue; + } + page = ceph_msg_data_next(cursor, &page_offset, &length, &last_piece); ret = ceph_tcp_sendpage(con->sock, page, page_offset, @@ -2297,7 +2330,12 @@ static int read_partial_msg_data(struct ceph_connection *con) if (do_datacrc) crc = con->in_data_crc; - while (cursor->resid) { + while (cursor->total_resid) { + if (!cursor->resid) { + ceph_msg_data_advance(cursor, 0); + continue; + } + page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); if (ret <= 0) { @@ -3262,16 +3300,14 @@ void ceph_msg_data_add_pagelist(struct ceph_msg *msg, EXPORT_SYMBOL(ceph_msg_data_add_pagelist); #ifdef CONFIG_BLOCK -void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio, - size_t length) +void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos, + u32 length) { struct ceph_msg_data *data; - BUG_ON(!bio); - data = ceph_msg_data_create(CEPH_MSG_DATA_BIO); BUG_ON(!data); - data->bio = bio; + data->bio_pos = *bio_pos; data->bio_length = length; list_add_tail(&data->links, &msg->data); @@ -3280,6 +3316,20 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio, EXPORT_SYMBOL(ceph_msg_data_add_bio); #endif /* CONFIG_BLOCK */ +void ceph_msg_data_add_bvecs(struct ceph_msg *msg, + struct ceph_bvec_iter *bvec_pos) +{ + struct ceph_msg_data *data; + + data = ceph_msg_data_create(CEPH_MSG_DATA_BVECS); + BUG_ON(!data); + data->bvec_pos = *bvec_pos; + + list_add_tail(&data->links, &msg->data); + msg->data_length += bvec_pos->iter.bi_size; +} +EXPORT_SYMBOL(ceph_msg_data_add_bvecs); + /* * construct a new message with given type, size * the new msg has a ref count of 1. diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 1547107f4854..b3dac24412d3 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -60,7 +60,7 @@ struct ceph_monmap *ceph_monmap_decode(void *p, void *end) num_mon = ceph_decode_32(&p); ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad); - if (num_mon >= CEPH_MAX_MON) + if (num_mon > CEPH_MAX_MON) goto bad; m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS); if (m == NULL) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 2814dba5902d..ea2a6c9fb7ce 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -20,6 +20,7 @@ #include <linux/ceph/decode.h> #include <linux/ceph/auth.h> #include <linux/ceph/pagelist.h> +#include <linux/ceph/striper.h> #define OSD_OPREPLY_FRONT_LEN 512 @@ -103,13 +104,12 @@ static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, u64 *objnum, u64 *objoff, u64 *objlen) { u64 orig_len = *plen; - int r; + u32 xlen; /* object extent? */ - r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum, - objoff, objlen); - if (r < 0) - return r; + ceph_calc_file_object_mapping(layout, off, orig_len, objnum, + objoff, &xlen); + *objlen = xlen; if (*objlen < orig_len) { *plen = *objlen; dout(" skipping last %llu, final file extent %llu~%llu\n", @@ -117,7 +117,6 @@ static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, } dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen); - return 0; } @@ -148,14 +147,22 @@ static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, #ifdef CONFIG_BLOCK static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, - struct bio *bio, size_t bio_length) + struct ceph_bio_iter *bio_pos, + u32 bio_length) { osd_data->type = CEPH_OSD_DATA_TYPE_BIO; - osd_data->bio = bio; + osd_data->bio_pos = *bio_pos; osd_data->bio_length = bio_length; } #endif /* CONFIG_BLOCK */ +static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data, + struct ceph_bvec_iter *bvec_pos) +{ + osd_data->type = CEPH_OSD_DATA_TYPE_BVECS; + osd_data->bvec_pos = *bvec_pos; +} + #define osd_req_op_data(oreq, whch, typ, fld) \ ({ \ struct ceph_osd_request *__oreq = (oreq); \ @@ -218,16 +225,29 @@ EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist); #ifdef CONFIG_BLOCK void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, - unsigned int which, struct bio *bio, size_t bio_length) + unsigned int which, + struct ceph_bio_iter *bio_pos, + u32 bio_length) { struct ceph_osd_data *osd_data; osd_data = osd_req_op_data(osd_req, which, extent, osd_data); - ceph_osd_data_bio_init(osd_data, bio, bio_length); + ceph_osd_data_bio_init(osd_data, bio_pos, bio_length); } EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio); #endif /* CONFIG_BLOCK */ +void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req, + unsigned int which, + struct ceph_bvec_iter *bvec_pos) +{ + struct ceph_osd_data *osd_data; + + osd_data = osd_req_op_data(osd_req, which, extent, osd_data); + ceph_osd_data_bvecs_init(osd_data, bvec_pos); +} +EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos); + static void osd_req_op_cls_request_info_pagelist( struct ceph_osd_request *osd_req, unsigned int which, struct ceph_pagelist *pagelist) @@ -265,6 +285,23 @@ void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req, } EXPORT_SYMBOL(osd_req_op_cls_request_data_pages); +void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req, + unsigned int which, + struct bio_vec *bvecs, u32 bytes) +{ + struct ceph_osd_data *osd_data; + struct ceph_bvec_iter it = { + .bvecs = bvecs, + .iter = { .bi_size = bytes }, + }; + + osd_data = osd_req_op_data(osd_req, which, cls, request_data); + ceph_osd_data_bvecs_init(osd_data, &it); + osd_req->r_ops[which].cls.indata_len += bytes; + osd_req->r_ops[which].indata_len += bytes; +} +EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs); + void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req, unsigned int which, struct page **pages, u64 length, u32 alignment, bool pages_from_pool, bool own_pages) @@ -290,6 +327,8 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) case CEPH_OSD_DATA_TYPE_BIO: return (u64)osd_data->bio_length; #endif /* CONFIG_BLOCK */ + case CEPH_OSD_DATA_TYPE_BVECS: + return osd_data->bvec_pos.iter.bi_size; default: WARN(true, "unrecognized data type %d\n", (int)osd_data->type); return 0; @@ -828,8 +867,10 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg, ceph_msg_data_add_pagelist(msg, osd_data->pagelist); #ifdef CONFIG_BLOCK } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { - ceph_msg_data_add_bio(msg, osd_data->bio, length); + ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length); #endif + } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) { + ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos); } else { BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); } @@ -5065,7 +5106,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, } EXPORT_SYMBOL(ceph_osdc_writepages); -int ceph_osdc_setup(void) +int __init ceph_osdc_setup(void) { size_t size = sizeof(struct ceph_osd_request) + CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op); @@ -5076,7 +5117,6 @@ int ceph_osdc_setup(void) return ceph_osd_request_cache ? 0 : -ENOMEM; } -EXPORT_SYMBOL(ceph_osdc_setup); void ceph_osdc_cleanup(void) { @@ -5084,7 +5124,6 @@ void ceph_osdc_cleanup(void) kmem_cache_destroy(ceph_osd_request_cache); ceph_osd_request_cache = NULL; } -EXPORT_SYMBOL(ceph_osdc_cleanup); /* * handle incoming message diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 0da27c66349a..9645ffd6acfb 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -4,7 +4,6 @@ #include <linux/module.h> #include <linux/slab.h> -#include <asm/div64.h> #include <linux/ceph/libceph.h> #include <linux/ceph/osdmap.h> @@ -2141,76 +2140,6 @@ bool ceph_osds_changed(const struct ceph_osds *old_acting, } /* - * calculate file layout from given offset, length. - * fill in correct oid, logical length, and object extent - * offset, length. - * - * for now, we write only a single su, until we can - * pass a stride back to the caller. - */ -int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, - u64 off, u64 len, - u64 *ono, - u64 *oxoff, u64 *oxlen) -{ - u32 osize = layout->object_size; - u32 su = layout->stripe_unit; - u32 sc = layout->stripe_count; - u32 bl, stripeno, stripepos, objsetno; - u32 su_per_object; - u64 t, su_offset; - - dout("mapping %llu~%llu osize %u fl_su %u\n", off, len, - osize, su); - if (su == 0 || sc == 0) - goto invalid; - su_per_object = osize / su; - if (su_per_object == 0) - goto invalid; - dout("osize %u / su %u = su_per_object %u\n", osize, su, - su_per_object); - - if ((su & ~PAGE_MASK) != 0) - goto invalid; - - /* bl = *off / su; */ - t = off; - do_div(t, su); - bl = t; - dout("off %llu / su %u = bl %u\n", off, su, bl); - - stripeno = bl / sc; - stripepos = bl % sc; - objsetno = stripeno / su_per_object; - - *ono = objsetno * sc + stripepos; - dout("objset %u * sc %u = ono %u\n", objsetno, sc, (unsigned int)*ono); - - /* *oxoff = *off % layout->fl_stripe_unit; # offset in su */ - t = off; - su_offset = do_div(t, su); - *oxoff = su_offset + (stripeno % su_per_object) * su; - - /* - * Calculate the length of the extent being written to the selected - * object. This is the minimum of the full length requested (len) or - * the remainder of the current stripe being written to. - */ - *oxlen = min_t(u64, len, su - su_offset); - - dout(" obj extent %llu~%llu\n", *oxoff, *oxlen); - return 0; - -invalid: - dout(" invalid layout\n"); - *ono = 0; - *oxoff = 0; - *oxlen = 0; - return -EINVAL; -} -EXPORT_SYMBOL(ceph_calc_file_object_mapping); - -/* * Map an object into a PG. * * Should only be called with target_oid and target_oloc (as opposed to diff --git a/net/ceph/striper.c b/net/ceph/striper.c new file mode 100644 index 000000000000..c36462dc86b7 --- /dev/null +++ b/net/ceph/striper.c @@ -0,0 +1,261 @@ +/* SPDX-License-Identifier: GPL-2.0 */ + +#include <linux/ceph/ceph_debug.h> + +#include <linux/math64.h> +#include <linux/slab.h> + +#include <linux/ceph/striper.h> +#include <linux/ceph/types.h> + +/* + * Map a file extent to a stripe unit within an object. + * Fill in objno, offset into object, and object extent length (i.e. the + * number of bytes mapped, less than or equal to @l->stripe_unit). + * + * Example for stripe_count = 3, stripes_per_object = 4: + * + * blockno | 0 3 6 9 | 1 4 7 10 | 2 5 8 11 | 12 15 18 21 | 13 16 19 + * stripeno | 0 1 2 3 | 0 1 2 3 | 0 1 2 3 | 4 5 6 7 | 4 5 6 + * stripepos | 0 | 1 | 2 | 0 | 1 + * objno | 0 | 1 | 2 | 3 | 4 + * objsetno | 0 | 1 + */ +void ceph_calc_file_object_mapping(struct ceph_file_layout *l, + u64 off, u64 len, + u64 *objno, u64 *objoff, u32 *xlen) +{ + u32 stripes_per_object = l->object_size / l->stripe_unit; + u64 blockno; /* which su in the file (i.e. globally) */ + u32 blockoff; /* offset into su */ + u64 stripeno; /* which stripe */ + u32 stripepos; /* which su in the stripe, + which object in the object set */ + u64 objsetno; /* which object set */ + u32 objsetpos; /* which stripe in the object set */ + + blockno = div_u64_rem(off, l->stripe_unit, &blockoff); + stripeno = div_u64_rem(blockno, l->stripe_count, &stripepos); + objsetno = div_u64_rem(stripeno, stripes_per_object, &objsetpos); + + *objno = objsetno * l->stripe_count + stripepos; + *objoff = objsetpos * l->stripe_unit + blockoff; + *xlen = min_t(u64, len, l->stripe_unit - blockoff); +} +EXPORT_SYMBOL(ceph_calc_file_object_mapping); + +/* + * Return the last extent with given objno (@object_extents is sorted + * by objno). If not found, return NULL and set @add_pos so that the + * new extent can be added with list_add(add_pos, new_ex). + */ +static struct ceph_object_extent * +lookup_last(struct list_head *object_extents, u64 objno, + struct list_head **add_pos) +{ + struct list_head *pos; + + list_for_each_prev(pos, object_extents) { + struct ceph_object_extent *ex = + list_entry(pos, typeof(*ex), oe_item); + + if (ex->oe_objno == objno) + return ex; + + if (ex->oe_objno < objno) + break; + } + + *add_pos = pos; + return NULL; +} + +static struct ceph_object_extent * +lookup_containing(struct list_head *object_extents, u64 objno, + u64 objoff, u32 xlen) +{ + struct ceph_object_extent *ex; + + list_for_each_entry(ex, object_extents, oe_item) { + if (ex->oe_objno == objno && + ex->oe_off <= objoff && + ex->oe_off + ex->oe_len >= objoff + xlen) /* paranoia */ + return ex; + + if (ex->oe_objno > objno) + break; + } + + return NULL; +} + +/* + * Map a file extent to a sorted list of object extents. + * + * We want only one (or as few as possible) object extents per object. + * Adjacent object extents will be merged together, each returned object + * extent may reverse map to multiple different file extents. + * + * Call @alloc_fn for each new object extent and @action_fn for each + * mapped stripe unit, whether it was merged into an already allocated + * object extent or started a new object extent. + * + * Newly allocated object extents are added to @object_extents. + * To keep @object_extents sorted, successive calls to this function + * must map successive file extents (i.e. the list of file extents that + * are mapped using the same @object_extents must be sorted). + * + * The caller is responsible for @object_extents. + */ +int ceph_file_to_extents(struct ceph_file_layout *l, u64 off, u64 len, + struct list_head *object_extents, + struct ceph_object_extent *alloc_fn(void *arg), + void *alloc_arg, + ceph_object_extent_fn_t action_fn, + void *action_arg) +{ + struct ceph_object_extent *last_ex, *ex; + + while (len) { + struct list_head *add_pos = NULL; + u64 objno, objoff; + u32 xlen; + + ceph_calc_file_object_mapping(l, off, len, &objno, &objoff, + &xlen); + + last_ex = lookup_last(object_extents, objno, &add_pos); + if (!last_ex || last_ex->oe_off + last_ex->oe_len != objoff) { + ex = alloc_fn(alloc_arg); + if (!ex) + return -ENOMEM; + + ex->oe_objno = objno; + ex->oe_off = objoff; + ex->oe_len = xlen; + if (action_fn) + action_fn(ex, xlen, action_arg); + + if (!last_ex) + list_add(&ex->oe_item, add_pos); + else + list_add(&ex->oe_item, &last_ex->oe_item); + } else { + last_ex->oe_len += xlen; + if (action_fn) + action_fn(last_ex, xlen, action_arg); + } + + off += xlen; + len -= xlen; + } + + for (last_ex = list_first_entry(object_extents, typeof(*ex), oe_item), + ex = list_next_entry(last_ex, oe_item); + &ex->oe_item != object_extents; + last_ex = ex, ex = list_next_entry(ex, oe_item)) { + if (last_ex->oe_objno > ex->oe_objno || + (last_ex->oe_objno == ex->oe_objno && + last_ex->oe_off + last_ex->oe_len >= ex->oe_off)) { + WARN(1, "%s: object_extents list not sorted!\n", + __func__); + return -EINVAL; + } + } + + return 0; +} +EXPORT_SYMBOL(ceph_file_to_extents); + +/* + * A stripped down, non-allocating version of ceph_file_to_extents(), + * for when @object_extents is already populated. + */ +int ceph_iterate_extents(struct ceph_file_layout *l, u64 off, u64 len, + struct list_head *object_extents, + ceph_object_extent_fn_t action_fn, + void *action_arg) +{ + while (len) { + struct ceph_object_extent *ex; + u64 objno, objoff; + u32 xlen; + + ceph_calc_file_object_mapping(l, off, len, &objno, &objoff, + &xlen); + + ex = lookup_containing(object_extents, objno, objoff, xlen); + if (!ex) { + WARN(1, "%s: objno %llu %llu~%u not found!\n", + __func__, objno, objoff, xlen); + return -EINVAL; + } + + action_fn(ex, xlen, action_arg); + + off += xlen; + len -= xlen; + } + + return 0; +} +EXPORT_SYMBOL(ceph_iterate_extents); + +/* + * Reverse map an object extent to a sorted list of file extents. + * + * On success, the caller is responsible for: + * + * kfree(file_extents) + */ +int ceph_extent_to_file(struct ceph_file_layout *l, + u64 objno, u64 objoff, u64 objlen, + struct ceph_file_extent **file_extents, + u32 *num_file_extents) +{ + u32 stripes_per_object = l->object_size / l->stripe_unit; + u64 blockno; /* which su */ + u32 blockoff; /* offset into su */ + u64 stripeno; /* which stripe */ + u32 stripepos; /* which su in the stripe, + which object in the object set */ + u64 objsetno; /* which object set */ + u32 i = 0; + + if (!objlen) { + *file_extents = NULL; + *num_file_extents = 0; + return 0; + } + + *num_file_extents = DIV_ROUND_UP_ULL(objoff + objlen, l->stripe_unit) - + DIV_ROUND_DOWN_ULL(objoff, l->stripe_unit); + *file_extents = kmalloc_array(*num_file_extents, sizeof(**file_extents), + GFP_NOIO); + if (!*file_extents) + return -ENOMEM; + + div_u64_rem(objoff, l->stripe_unit, &blockoff); + while (objlen) { + u64 off, len; + + objsetno = div_u64_rem(objno, l->stripe_count, &stripepos); + stripeno = div_u64(objoff, l->stripe_unit) + + objsetno * stripes_per_object; + blockno = stripeno * l->stripe_count + stripepos; + off = blockno * l->stripe_unit + blockoff; + len = min_t(u64, objlen, l->stripe_unit - blockoff); + + (*file_extents)[i].fe_off = off; + (*file_extents)[i].fe_len = len; + + blockoff = 0; + objoff += len; + objlen -= len; + i++; + } + + BUG_ON(i != *num_file_extents); + return 0; +} +EXPORT_SYMBOL(ceph_extent_to_file); |