[RFC PATCH v2] dm-csum: A new device mapper target that checksdata integrity

From: Alberto Bertogli
Date: Tue May 26 2009 - 15:51:59 EST


On Thu, May 21, 2009 at 01:13:17PM -0300, Alberto Bertogli wrote:
> I'm writing this device mapper target that stores checksums on writes and
> verifies them on reads.

Here's a new version of the patch, against current Linus' git tree. The most
important change from the first one is the support of the bio-integrity
extensions.

As with the previous version, it's been only mildly tested (creation of a
filesystem and basic file manipulation, over a loop device).

Thanks a lot,
Alberto

diff --git a/drivers/md/Kconfig b/drivers/md/Kconfig
index 36e0675..081e9bc 100644
--- a/drivers/md/Kconfig
+++ b/drivers/md/Kconfig
@@ -258,6 +258,16 @@ config DM_DELAY

If unsure, say N.

+config DM_CSUM
+ tristate "Checksumming target (EXPERIMENTAL)"
+ depends on BLK_DEV_DM && EXPERIMENTAL
+ select CRC_CCITT
+ ---help---
+ A target that stores checksums on writes, and verifies
+ them on reads.
+
+ If unsure, say N.
+
config DM_UEVENT
bool "DM uevents (EXPERIMENTAL)"
depends on BLK_DEV_DM && EXPERIMENTAL
diff --git a/drivers/md/Makefile b/drivers/md/Makefile
index 45cc595..f938787 100644
--- a/drivers/md/Makefile
+++ b/drivers/md/Makefile
@@ -39,6 +39,7 @@ obj-$(CONFIG_DM_MULTIPATH) += dm-multipath.o dm-round-robin.o
obj-$(CONFIG_DM_SNAPSHOT) += dm-snapshot.o
obj-$(CONFIG_DM_MIRROR) += dm-mirror.o dm-log.o dm-region-hash.o
obj-$(CONFIG_DM_ZERO) += dm-zero.o
+obj-$(CONFIG_DM_CSUM) += dm-csum.o

quiet_cmd_unroll = UNROLL $@
cmd_unroll = $(PERL) $(srctree)/$(src)/unroll.pl $(UNROLL) \
diff --git a/drivers/md/dm-csum.c b/drivers/md/dm-csum.c
new file mode 100644
index 0000000..809cd1c
--- /dev/null
+++ b/drivers/md/dm-csum.c
@@ -0,0 +1,1537 @@
+/*
+ * A target that stores checksums on writes, and verifies them on reads.
+ * Alberto Bertogli <albertito@xxxxxxxxxxxxxx>
+ *
+ * This device-mapper module provides data integrity verification by storing
+ * checksums on writes, and verifying them on reads.
+ *
+ *
+ * On-disk format
+ * --------------
+ *
+ * It stores an 8-byte "integrity metadata" ("imd", from now on) structure for
+ * each 512-byte data sector. imd structures are clustered in groups of 62
+ * plus a small header, so they fit a sector (referred to as an "imd sector").
+ * Every imd sector has a "brother", another adjacent imd sector, for
+ * consistency purposes (explained below). That means we devote two sectors to
+ * imd storage for every 62 data sectors.
+ *
+ * The imd structure consists of:
+ * - 16 bit CRC (CCITT) (big endian)
+ * - 16 bit flags (big endian)
+ * - 32 bit tag
+ *
+ * The CRC is, obviously, the CRC of the sector this structure refers to. The
+ * flags are unused at the moment. The tag is not used by this module, but
+ * made available to the upper layers through the integrity framework.
+ *
+ * The imd sector header contains a mark of the last update, so given two
+ * brothers we can determine which one is younger.
+ *
+ *
+ * We can either use the same device to store data sectors and imd sectors, or
+ * store each in different devices. If only one device is used, the sectors
+ * are interleaved: 1 sector is used to contain the imd for the following 62.
+ *
+ *
+ * Write procedure
+ * ---------------
+ *
+ * To guarantee consistency, two imd sectors (named M1 and M2) are kept for
+ * every 62 data sectors, and the following procedure is used to update them
+ * when a write to a given sector is required:
+ *
+ * - Read both M1 and M2.
+ * - Find out (using information stored in their headers) which one is newer.
+ * Let's assume M1 is newer than M2.
+ * - Update the M2 buffer to mark it's newer, and update the new data's CRC.
+ * - Submit the write to M2, and then the write to the data, using a barrier
+ * to make sure the metadata is updated _before_ the data.
+ *
+ * Accordingly, the read operations are handled as follows:
+ *
+ * - Read both the data, M1 and M2.
+ * - Find out which one is newer. Let's assume M1 is newer than M2.
+ * - Calculate the data's CRC, and compare it to the one found in M1. If they
+ * match, the reading is successful. If not, compare it to the one found in
+ * M2. If they match, the reading is successful; otherwise, fail. If
+ * the read involves multiple sectors, it is possible that some of the
+ * correct CRCs are in M1 and some in M2.
+ *
+ * This scheme assumes that single sector writes are atomic in the presence of
+ * a crash.
+ * XXX: is this a reasonable assumption?
+ *
+ * TODO: would it be better to have M1 and M2 apart, to improve the chances of
+ * recovery in case of a failure?
+ *
+ * A simple locking structure is used to prevent simultaneous changes to the
+ * imd sectors.
+ *
+ * A last_accessed counter is stored in the imd sector header and used to
+ * find out if the given sector is newer than its brother. When writing out an
+ * imd sector, we will increase its count by 2.
+ *
+ *
+ * Code overview
+ * -------------
+ *
+ * The code uses the term "nice bio" to refer to a bio if all its sectors are
+ * covered by a single imd sector. Otherwise, the bio is "evil".
+ *
+ * The bulk of the code is the read and write handling, which is only designed
+ * to work with nice bios for simplicity. There's additional
+ * direction-independent code to split evil bios into nice ones.
+ *
+ * The rest is mostly concerned with device-mapper and module stuff.
+ *
+ * The code is divided in the following sections:
+ *
+ * - Generic and miscellaneous code, including the csum_c structured used to
+ * track a single csum device, and the functions used to manipulate sector
+ * numbers.
+ * - bio-integrity.
+ * - imd generation and verification.
+ * - Read handling (remember: only for nice bios).
+ * - Write handling (idem).
+ * - Work queue.
+ * - Evil bios handling.
+ * - Device mapper constructor, destructor and mapper functions.
+ * - DM target registration and module stuff.
+ *
+ */
+
+#include <linux/module.h>
+#include <linux/init.h>
+#include <linux/kernel.h>
+#include <linux/bio.h>
+#include <linux/slab.h>
+#include <linux/crc-ccitt.h>
+#include <linux/spinlock.h>
+#include <linux/mutex.h>
+#include <asm/atomic.h>
+#include <linux/device-mapper.h>
+#include <linux/workqueue.h>
+
+#define DM_MSG_PREFIX "csum"
+
+#if 1
+ #define dprint(...) printk(KERN_DEBUG __VA_ARGS__)
+#else
+ #define dprint(...)
+#endif
+
+
+/* Context information for device mapper */
+
+typedef sector_t (map_data_sector_fn) (struct dm_target *ti, sector_t data);
+typedef sector_t (get_imd_sector_fn) (struct dm_target *ti, sector_t data);
+
+struct csum_c {
+ /* data backing device */
+ struct dm_dev *data_dev;
+ sector_t data_start;
+
+ /* imd backing device (can be the same as data_dev) */
+ struct dm_dev *imd_dev;
+ sector_t imd_start;
+ sector_t imd_len;
+
+ map_data_sector_fn *map_data_sector;
+ get_imd_sector_fn *get_imd_sector;
+};
+
+
+/* TODO: use decent locking. At the moment, this semaphore is locked prior to
+ * submission to the work queue, and gets released after the work has been
+ * processed. This is needed to avoid concurrent accesses to the imd sectors.
+ * In the future, fine grained locking will be implemented. */
+static DECLARE_MUTEX(wq_lock);
+
+
+/*
+ * Utility functions for disk data manipulation
+ */
+
+/* How many sectors we reserve at the beginning of the data device for
+ * identification and device metadata */
+#define RESERVED_INITIAL_SECTORS_D 1
+
+/* If the metadata is on a different device, how many sectors we reserve at
+ * the beginning for identification and device metadata */
+#define RESERVED_INITIAL_SECTORS_M 1
+
+/* How many data sectors for each metadata sector. See the initial comment for
+ * a rationale on the value. */
+#define SECTORS_PER_IMD 62
+
+
+/* Return how many sectors are needed to store the imd information for the
+ * given amount of data sectors */
+static sector_t imd_sectors_needed(sector_t sectors)
+{
+ return dm_sector_div_up(sectors, SECTORS_PER_IMD) * 2;
+}
+
+/* Given a dm device sector, return the corresponding data device sector to
+ * find it from. We have one function to use when data and metadata are stored
+ * in different devices, and one to use when they're in the same device. Which
+ * one to use will be determined via function pointers in the context
+ * structure. */
+static sector_t map_data_sector_diff(struct dm_target *ti, sector_t data)
+{
+ struct csum_c *cc = ti->private;
+
+ /* When stored in different devices, data is stored directly at the
+ * given offset */
+ return cc->data_start + RESERVED_INITIAL_SECTORS_D +
+ (data - ti->begin);
+}
+
+static sector_t map_data_sector_same(struct dm_target *ti, sector_t data)
+{
+ struct csum_c *cc = ti->private;
+
+ /* When stored in the same device, interleaving makes things a little
+ * more complicated. The offset will be the same as if there was no
+ * interleaving, plus the number of imd sectors.
+ * We call imd_sectors_needed with (data - ti->begin + 1) because it
+ * receives a number of sectors, so 0 means no sectors and not an
+ * offset. */
+ return cc->data_start + RESERVED_INITIAL_SECTORS_D +
+ (data - ti->begin) + imd_sectors_needed(data - ti->begin + 1);
+}
+
+/* Return the imd sector that holds the tuple for the given data sector. Its
+ * brother imd sector will be the result + 1, as they're always adjacent. */
+static sector_t get_imd_sector_diff(struct dm_target *ti, sector_t data)
+{
+ return RESERVED_INITIAL_SECTORS_M +
+ imd_sectors_needed(data - ti->begin + 1);
+}
+
+static sector_t get_imd_sector_same(struct dm_target *ti, sector_t data)
+{
+ sector_t isn = imd_sectors_needed(data - ti->begin + 1);
+
+ return RESERVED_INITIAL_SECTORS_D + SECTORS_PER_IMD * ((isn - 2) / 2)
+ + (isn - 2);
+}
+
+
+/*
+ * Integrity metadata manipulation
+ */
+
+/* Each sector's integrity metadata. We only use crc at the moment. */
+struct imd_tuple {
+ __be16 crc;
+ __be16 flags;
+ __be32 tag;
+} __attribute__ ((packed));
+
+/* imd sector header, holds internal metadata */
+struct imd_sector_header {
+ /* 8 bits is enough for last_updated, */
+ u8 last_updated;
+ u8 unused1;
+ __be16 crc;
+ __be32 unused3;
+} __attribute__ ((packed));
+
+/* Return the older of m1 and m2, or NULL if it was impossible to determine */
+static struct imd_sector_header *older_imd(struct imd_sector_header *m1,
+ struct imd_sector_header *m2)
+{
+ int l1, l2;
+
+ /* we get the values into something signed so we can subtract them */
+ l1 = m1->last_updated;
+ l2 = m2->last_updated;
+
+ if (abs(l1 - l2) > 1) {
+ //dprint("wrap-around: %d %d %u\n", l1, l2, abs(l1 - l2));
+ if (l1 == 0) {
+ return m2;
+ } else if (l2 == 0) {
+ return m1;
+ } else {
+ return NULL;
+ }
+ } else {
+ if (l1 > l2) {
+ return m2;
+ } else if (l1 < l2) {
+ return m1;
+ } else {
+ return NULL;
+ }
+ }
+}
+
+/* Return a bio that reads the given imd sectors (both M1 and M2), setting
+ * the bi_bdev to bdev, bi_end_io callback to cb, and bi_private to private.
+ * The returned bio will have a single page allocated, that must be freed. */
+static struct bio *prepare_imd_read(struct block_device *bdev, sector_t sector,
+ bio_end_io_t *cb, void *private)
+{
+ struct page *page = NULL;
+ struct bio *bio = NULL;
+
+ page = alloc_page(GFP_NOIO);
+ if (page == NULL)
+ goto error;
+
+ bio = bio_alloc(GFP_NOIO, 1);
+ if (bio == NULL)
+ goto error;
+
+ bio->bi_bdev = bdev;
+ bio->bi_sector = sector;
+ bio->bi_size = 1024;
+ bio->bi_rw |= READ;
+ bio->bi_end_io = cb;
+ bio->bi_private = private;
+ if (bio_add_page(bio, page, 1024, 0) != 1024)
+ goto error;
+
+ return bio;
+
+error:
+ if (page)
+ __free_page(page);
+ if (bio) {
+ bio->bi_end_io = NULL;
+ bio_put(bio);
+ }
+
+ return NULL;
+}
+
+/* Calculate the CRCs for the sectors in given bio. It assumes there is enough
+ * space in crc for all the sectors (i.e. crc can hold at least
+ * bio_sectors(bio) 16 bit integers). */
+static void crc_sectors_from_bio(const struct bio *bio, u16 *crc)
+{
+ int segno;
+ struct bio_vec *bvec;
+ unsigned long flags;
+ unsigned int sectors;
+ size_t len;
+ u16 current_crc;
+
+ /* bytes needed to complete the current CRC */
+ unsigned int bytes_needed;
+
+ /* bytes left in the current bvec */
+ unsigned int left_in_bvec;
+
+ sectors = bio_sectors(bio);
+
+ /* XXX: is there really no other way than using bvec_kmap_irq()? */
+ current_crc = 0;
+ bytes_needed = 512;
+ bio_for_each_segment(bvec, bio, segno) {
+ unsigned char *data = bvec_kmap_irq(bvec, &flags);
+ left_in_bvec = bvec->bv_len;
+
+start:
+ len = min(left_in_bvec, bytes_needed);
+ current_crc = crc_ccitt(current_crc, data, len);
+
+ bytes_needed -= len;
+ left_in_bvec -= len;
+
+ if (unlikely(bytes_needed)) {
+ /* we need to go through the next bvec */
+ dprint("next bvec\n");
+ bvec_kunmap_irq(data, &flags);
+ continue;
+ }
+
+ sectors--;
+ *crc = current_crc;
+ crc++;
+ current_crc = 0;
+ bytes_needed = 512;
+
+ if (left_in_bvec && sectors) {
+ /* this bvec still has some data left; if we still
+ * have crcs to calculate, use it for the next one */
+ data += len;
+ goto start;
+ }
+
+ bvec_kunmap_irq(data, &flags);
+ }
+}
+
+
+/*
+ * bio-integrity extensions
+ */
+
+#ifdef CONFIG_BLK_DEV_INTEGRITY
+
+static void imd_generate(struct blk_integrity_exchg *bix)
+{
+ unsigned int i;
+ void *buf = bix->data_buf;
+ struct imd_tuple *imd = bix->prot_buf;
+
+ /* dprint("imd_gen(): s:%llu ss:%u ds:%u\n",
+ (unsigned long long) bix->sector, bix->sector_size,
+ bix->data_size); */
+
+ for (i = 0; i < bix->data_size; i += bix->sector_size) {
+ imd->crc = crc_ccitt(0, buf, bix->sector_size);
+ imd->tag = 0;
+ imd->flags = 0;
+
+ buf += bix->sector_size;
+ imd++;
+ }
+}
+
+static int imd_verify(struct blk_integrity_exchg *bix)
+{
+ unsigned int i;
+ void *buf = bix->data_buf;
+ struct imd_tuple *imd = bix->prot_buf;
+ u16 crc;
+ sector_t sector = bix->sector;
+
+ /* dprint("imd_vfy(): s:%llu ss:%u ds:%u\n", (unsigned long long) sector,
+ bix->sector_size, bix->data_size); */
+
+ for (i = 0; i < bix->data_size; i += bix->sector_size) {
+ crc = crc_ccitt(0, buf, bix->sector_size);
+ if (crc != imd->crc) {
+ printk(KERN_ERR "%s: checksum error on sector %llu"
+ " - disk:%04x imd:%04x\n",
+ bix->disk_name,
+ (unsigned long long) sector, crc,
+ imd->crc);
+ dprint("verify: d:%p p:%p imd:%p\n", bix->data_buf,
+ bix->prot_buf, imd);
+ return -EIO;
+ }
+
+ buf += bix->sector_size;
+ imd++;
+ sector++;
+ }
+
+ return 0;
+}
+
+static void imd_get_tag(void *prot, void *tag_buf, unsigned int sectors)
+{
+ unsigned int i;
+ struct imd_tuple *imd = prot;
+ u16 *tag = tag_buf;
+
+ for (i = 0; i < sectors; i++) {
+ *tag = imd->tag;
+ tag++;
+ imd++;
+ }
+}
+
+static void imd_set_tag(void *prot, void *tag_buf, unsigned int sectors)
+{
+ unsigned int i;
+ struct imd_tuple *imd = prot;
+ u16 *tag = tag_buf;
+
+ for (i = 0; i < sectors; i++) {
+ imd->tag = *tag;
+ tag++;
+ imd++;
+ }
+}
+
+static struct blk_integrity integrity_profile = {
+ .name = "LINUX-DMCSUM-V0-CCITT",
+ .generate_fn = imd_generate,
+ .verify_fn = imd_verify,
+ .get_tag_fn = imd_get_tag,
+ .set_tag_fn = imd_set_tag,
+ .tuple_size = sizeof(struct imd_tuple),
+ .tag_size = sizeof(u16),
+};
+
+static int bi_register(struct dm_target *ti)
+{
+ struct mapped_device *md;
+ struct gendisk *disk;
+
+ md = dm_table_get_md(ti->table);
+ disk = dm_disk(md);
+
+ return blk_integrity_register(disk, &integrity_profile);
+}
+
+static void bi_unregister(struct dm_target *ti)
+{
+ struct mapped_device *md;
+ struct gendisk *disk;
+
+ md = dm_table_get_md(ti->table);
+ disk = dm_disk(md);
+
+ blk_integrity_unregister(disk);
+}
+
+/* Copy the given buffer into the given bip */
+static void copy_to_bip(struct bio_integrity_payload *bip,
+ const unsigned char *buf, unsigned int size)
+{
+ unsigned int i;
+ unsigned int advance;
+ unsigned long flags;
+ struct bio_vec *bvec;
+
+ bip_for_each_vec(bvec, bip, i) {
+ unsigned char *data = bvec_kmap_irq(bvec, &flags);
+
+ advance = min(bvec->bv_len, size);
+
+ memcpy(data, buf, advance);
+
+ buf += advance;
+ size -= advance;
+
+ bvec_kunmap_irq(data, &flags);
+
+ if (size == 0)
+ break;
+ }
+}
+
+/* Set bio's integrity information taking it from imd_bio */
+static void set_bi_from_imd(struct bio *bio, struct bio *imd_bio)
+{
+ unsigned long flags;
+ struct imd_tuple *t;
+ struct bio_integrity_payload *bip = bio->bi_integrity;
+ unsigned char *imd_buf;
+
+ imd_buf = bvec_kmap_irq(bio_iovec(imd_bio), &flags);
+
+ t = (struct imd_tuple *) (imd_buf + sizeof(struct imd_sector_header));
+ t += bio->bi_sector % SECTORS_PER_IMD;
+
+ copy_to_bip(bip, (unsigned char *) t,
+ bio_sectors(bio) * sizeof(struct imd_tuple));
+
+ bvec_kunmap_irq(imd_buf, &flags);
+}
+
+/* Updates bio's integrity information at the given position, taking it from
+ * the given imd tuple */
+static void update_bi_info(struct bio *bio, unsigned int pos,
+ struct imd_tuple *tuple)
+{
+ unsigned long flags;
+ unsigned char *bip_buf;
+ struct imd_tuple *t;
+
+ BUG_ON(bio_integrity(bio) == 0);
+
+ bip_buf = bvec_kmap_irq(bip_vec(bio->bi_integrity), &flags);
+
+ BUG_ON(bip_buf == NULL);
+
+ t = (struct imd_tuple *) bip_buf;
+ t += pos;
+ t->crc = tuple->crc;
+ t->tag = tuple->tag;
+ t->flags = tuple->flags;
+
+ bvec_kunmap_irq(bip_buf, &flags);
+}
+#else /* BLK_DEV_INTEGRITY */
+
+static int bi_register(struct dm_target *ti)
+{
+ return 0;
+}
+
+static void bi_unregister(struct dm_target *ti)
+{
+ return;
+}
+
+static void set_bi_from_imd(struct bio *bio, struct bio *imd_bio)
+{
+ return;
+}
+
+static void update_bi_info(struct bio *bio, unsigned int pos,
+ struct imd_tuple *tuple)
+{
+ return;
+}
+#endif /* BLK_DEV_INTEGRITY */
+
+
+/*
+ * imd generation and verification
+ */
+
+/* Update the imd information for the given data bio. The function deals with
+ * the imd bio directly, that holds one page with both imd sectors (M1 and
+ * M2), as returned from prepare_imd_read(), and assumes it's been read from
+ * disk (so it will only update what's needed).
+ *
+ * Modifies imd_bio so it only writes the sector needed.
+ *
+ * Returns:
+ * - 0 on success
+ * - -1 if there was a memory error
+ * - -2 if there was a consistency error
+ */
+static int update_imd_bio(const struct bio *data_bio, struct bio *imd_bio)
+{
+ int i;
+ u16 *crc;
+ unsigned long flags;
+ unsigned char *imd_buf;
+ struct imd_sector_header *m1, *m2, *older;
+ struct imd_tuple *t;
+
+ crc = kmalloc(sizeof(u16) * bio_sectors(data_bio), GFP_NOIO);
+ if (crc == NULL)
+ return -1;
+
+ crc_sectors_from_bio(data_bio, crc);
+
+ imd_buf = bvec_kmap_irq(bio_iovec(imd_bio), &flags);
+
+ m1 = (struct imd_sector_header *) imd_buf;
+ m2 = (struct imd_sector_header *) (imd_buf + 512);
+
+ older = older_imd(m1, m2);
+ if (older == NULL) {
+ bvec_kunmap_irq(imd_buf, &flags);
+ kfree(crc);
+ return -2;
+ }
+
+ t = (struct imd_tuple *) (older + 1);
+ t = t + data_bio->bi_sector % SECTORS_PER_IMD;
+
+ for (i = 0; i < bio_sectors(data_bio); i++) {
+ t->crc = *(crc + i);
+ t++;
+ }
+
+ older->last_updated += 2;
+ older->crc = crc_ccitt(0, (unsigned char *) (older + 1),
+ 512 - sizeof(struct imd_sector_header));
+
+ bvec_kunmap_irq(imd_buf, &flags);
+
+ kfree(crc);
+
+ imd_bio->bi_size = 512;
+ bio_iovec(imd_bio)->bv_len = 512;
+ if (older == m2) {
+ imd_bio->bi_sector++;
+ bio_iovec(imd_bio)->bv_offset = 512;
+ }
+
+ return 0;
+}
+
+/* Verify that the CRCs from data_bio match the ones stored in imd_bio (which
+ * contains both M1 and M2), and update data_bio integrity information (if
+ * there is any) */
+/* TODO: choose a better name */
+static int verify_crc(struct bio *data_bio, struct bio *imd_bio)
+{
+ int i, r;
+ u16 *crc;
+ unsigned long flags;
+ unsigned char *imd_buf;
+ struct imd_sector_header *m1, *m2, *older, *newer;
+ struct imd_tuple *nt, *ot;
+
+ crc = kmalloc(sizeof(u16) * bio_sectors(data_bio), GFP_NOIO);
+ if (crc == NULL)
+ return -ENOMEM;
+
+ crc_sectors_from_bio(data_bio, crc);
+
+ imd_buf = bvec_kmap_irq(bio_iovec(imd_bio), &flags);
+
+ m1 = (struct imd_sector_header *) imd_buf;
+ m2 = (struct imd_sector_header *) (imd_buf + 512);
+
+ older = older_imd(m1, m2);
+ if (older == NULL) {
+ printk(KERN_WARNING "dm-csum: couldn't find older\n");
+ r = -ENOMEM;
+ goto exit;
+ }
+
+ newer = m1;
+ if (older == m1)
+ newer = m2;
+
+ nt = (struct imd_tuple *) (newer + 1);
+ nt += data_bio->bi_sector % SECTORS_PER_IMD;
+ ot = (struct imd_tuple *) (older + 1);
+ ot += data_bio->bi_sector % SECTORS_PER_IMD;
+
+ r = 0;
+
+ BUG_ON(bio_sectors(data_bio) > SECTORS_PER_IMD);
+
+ for (i = 0; i < bio_sectors(data_bio); i++) {
+ if (nt->crc == *(crc + i)) {
+ update_bi_info(data_bio, i, nt);
+ } else if (ot->crc == *(crc + i)){
+ update_bi_info(data_bio, i, ot);
+
+ /* dprint("no match from new\n");
+ dprint(" new: %d %04x\n", newer->last_updated,
+ nt->crc);
+ dprint(" old: %d %04x\n", older->last_updated,
+ ot->crc);
+ dprint(" real: %04x\n", *(crc + i)); */
+ } else {
+ printk(KERN_WARNING
+ "dm-csum: CRC error at sector %lld\n",
+ (unsigned long long)
+ (data_bio->bi_sector + i));
+ dprint("CRC: %llu o:%x n:%x r:%x\n",
+ (unsigned long long)
+ (data_bio->bi_sector + i),
+ ot->crc, nt->crc, *(crc + i));
+ r = -EIO;
+ break;
+ }
+ nt++;
+ ot++;
+ }
+
+ /* TODO: validate the imd sector CRC */
+
+exit:
+ bvec_kunmap_irq(imd_buf, &flags);
+
+ kfree(crc);
+
+ return r;
+}
+
+
+/* Work queue where the read/write processing code is run.
+ * TODO: Unify with the submission workqueue once we have decent locking. */
+static struct workqueue_struct *io_wq;
+
+/*
+ * READ handling (nice bios only)
+ *
+ * Reads are handled by reading the requested data, and the imd sector
+ * associated with it. When both requests are completed, the data checksum is
+ * calculated and compared against what's in the imd sector.
+ */
+
+/* Used to track pending reads */
+struct pending_read {
+ struct dm_target *ti;
+ struct csum_c *cc;
+ struct bio *orig_bio;
+
+ struct bio *data_bio;
+ struct bio *imd_bio;
+
+ bool error;
+
+ /* number of operations pending */
+ atomic_t nr_pending;
+
+ struct work_struct work;
+};
+
+static void read_nice_bio(struct dm_target *ti, struct bio *bio);
+static struct bio *prepare_data_read(struct bio *orig_bio,
+ struct block_device *bdev, sector_t sector, bio_end_io_t *cb,
+ void *private);
+static void queue_read_complete(struct bio *bio, int error);
+static void read_complete(struct work_struct *work);
+
+/* Read a nice bio */
+static void read_nice_bio(struct dm_target *ti, struct bio *bio)
+{
+ struct csum_c *cc = ti->private;
+ struct pending_read *pr;
+
+ pr = kmalloc(sizeof(*pr), GFP_NOIO);
+ if (pr == NULL)
+ goto error;
+
+ pr->ti = ti;
+ pr->cc = cc;
+ pr->orig_bio = bio;
+ pr->error = false;
+
+ pr->data_bio = prepare_data_read(pr->orig_bio, cc->data_dev->bdev,
+ cc->map_data_sector(ti, pr->orig_bio->bi_sector),
+ queue_read_complete, pr);
+ if (pr->data_bio == NULL)
+ goto error;
+
+ pr->imd_bio = prepare_imd_read(cc->imd_dev->bdev,
+ cc->get_imd_sector(ti, pr->orig_bio->bi_sector),
+ queue_read_complete, pr);
+ if (pr->imd_bio == NULL)
+ goto error;
+
+ atomic_set(&pr->nr_pending, 2);
+
+ submit_bio(pr->data_bio->bi_rw, pr->data_bio);
+ submit_bio(pr->imd_bio->bi_rw, pr->imd_bio);
+ return;
+
+error:
+ bio_endio(bio, -ENOMEM);
+ return;
+}
+
+/* Prepare a new bio to read the data requested in orig_bio */
+static struct bio *prepare_data_read(struct bio *orig_bio,
+ struct block_device *bdev, sector_t sector, bio_end_io_t *cb,
+ void *private)
+{
+ struct bio *bio;
+
+ /* clone the bio so we don't override the original's bi_private and
+ * bi_end_io */
+ bio = bio_clone(orig_bio, GFP_NOIO);
+ if (bio == NULL)
+ return NULL;
+
+ bio->bi_bdev = bdev;
+ bio->bi_sector = sector;
+ bio->bi_end_io = cb;
+ bio->bi_private = private;
+
+ return bio;
+}
+
+static void queue_read_complete(struct bio *bio, int error)
+{
+ struct pending_read *pr = bio->bi_private;
+
+ if (error)
+ pr->error = true;
+
+ if (!atomic_dec_and_test(&pr->nr_pending))
+ return;
+
+ /* defer the completion so it's not run in interrupt context */
+ INIT_WORK(&(pr->work), read_complete);
+ queue_work(io_wq, &(pr->work));
+}
+
+static void read_complete(struct work_struct *work)
+{
+ int result = -EIO;
+ struct pending_read *pr;
+
+ pr = container_of(work, struct pending_read, work);
+
+ /* TODO: use decent locking */
+ up(&wq_lock);
+
+ /* it not only verifies the CRC, but also update orig_bio's integrity
+ * information
+ * TODO: add an option for those who do not want the bio to fail on
+ * CRC errors */
+ /* XXX: should we update bip on failed bios? */
+ result = verify_crc(pr->orig_bio, pr->imd_bio);
+
+ if (pr->error)
+ result = -EIO;
+
+ /* free the page allocated in prepare_imd_read() */
+ __free_page(pr->imd_bio->bi_io_vec->bv_page);
+
+ /* XXX: is the ordering between this and bio_put(pr->data_bio)
+ * important? I think not, but confirmation wouldn't hurt */
+ bio_endio(pr->orig_bio, result);
+
+ bio_put(pr->data_bio);
+ bio_put(pr->imd_bio);
+
+ kfree(pr);
+}
+
+
+/*
+ * WRITE handling (nice bios only)
+ */
+
+/* Used to track pending writes */
+struct pending_write {
+ struct dm_target *ti;
+ struct csum_c *cc;
+
+ struct bio *orig_bio;
+ struct bio *imd_bio;
+ struct bio *data_bio;
+
+ bool error;
+ atomic_t nr_pending;
+
+ struct work_struct work1;
+ struct work_struct work2;
+};
+
+/* Writes begin with write_nice_bio(), that queues the imd bio read. When that
+ * bio is done, write_stage1() gets called, which updates the imd data and
+ * then queues both the imd write and the data write. When those are
+ * completed, write_stage2() gets called, which finishes up and ends the
+ * original bio. To avoid running the completion code in interrupt context,
+ * the stage functions run through a workqueue. */
+static void write_nice_bio(struct dm_target *ti, struct bio *bio);
+static void queue_write_stage1(struct bio *bio, int error);
+static void write_stage1(struct work_struct *work);
+static void queue_write_stage2(struct bio *bio, int error);
+static void write_stage2(struct work_struct *work);
+
+/* Write a nice bio */
+static void write_nice_bio(struct dm_target *ti, struct bio *bio)
+{
+ struct csum_c *cc = ti->private;
+ struct pending_write *pw;
+
+ pw = kmalloc(sizeof(*pw), GFP_NOIO);
+ if (pw == NULL) {
+ bio_endio(bio, -ENOMEM);
+ return;
+ }
+
+ pw->ti = ti;
+ pw->cc = cc;
+ pw->orig_bio = bio;
+ pw->data_bio = NULL;
+ pw->error = false;
+ atomic_set(&pw->nr_pending, 0);
+
+ pw->imd_bio = prepare_imd_read(cc->imd_dev->bdev,
+ cc->get_imd_sector(ti, pw->orig_bio->bi_sector),
+ queue_write_stage1, pw);
+ if (pw->imd_bio == NULL) {
+ kfree(pw);
+ bio_endio(bio, -ENOMEM);
+ return;
+ }
+
+ submit_bio(pw->imd_bio->bi_rw, pw->imd_bio);
+}
+
+static void queue_write_stage1(struct bio *imd_bio, int error)
+{
+ struct pending_write *pw = imd_bio->bi_private;
+
+ if (error)
+ pw->error = true;
+
+ INIT_WORK(&(pw->work1), write_stage1);
+ queue_work(io_wq, &(pw->work1));
+}
+
+static void write_stage1(struct work_struct *work)
+{
+ int r;
+ int err = -EIO;
+ struct bio *data_bio;
+ struct pending_write *pw;
+
+ pw = container_of(work, struct pending_write, work1);
+
+ //dprint("write stage 1 %llu\n", (unsigned long long) pw->orig_bio->bi_sector);
+
+ if (pw->error)
+ goto error;
+
+ r = update_imd_bio(pw->orig_bio, pw->imd_bio);
+ if (r == -1) {
+ err = -ENOMEM;
+ goto error;
+ } else if (r == -2) {
+ printk(KERN_WARNING "dm-csum: consistency error updating"
+ " imd sector\n");
+ err = -EIO;
+ goto error;
+ }
+
+ /* prepare bio for reuse */
+ pw->imd_bio->bi_rw |= WRITE;
+ pw->imd_bio->bi_end_io = queue_write_stage2;
+
+ data_bio = bio_clone(pw->orig_bio, GFP_NOIO);
+ if (data_bio == NULL) {
+ err = -ENOMEM;
+ goto error;
+ }
+
+ data_bio->bi_private = pw;
+ data_bio->bi_end_io = queue_write_stage2;
+ data_bio->bi_bdev = pw->cc->data_dev->bdev;
+ data_bio->bi_sector = pw->cc->map_data_sector(pw->ti,
+ pw->orig_bio->bi_sector);
+
+ /* data bio takes a barrier, so we know the imd write will have
+ * completed before it hits the disk */
+ /* TODO: the underlying device might not support barriers
+ * TODO: when data and imd are on separate devices, the barrier trick
+ * is no longer useful */
+ data_bio->bi_rw |= (1 << BIO_RW_BARRIER);
+
+ pw->data_bio = data_bio;
+
+ /* submit both bios at the end to simplify error handling; remember
+ * the order is very important because of the barrier */
+ atomic_set(&pw->nr_pending, 2);
+ submit_bio(pw->imd_bio->bi_rw, pw->imd_bio);
+ submit_bio(data_bio->bi_rw, data_bio);
+ return;
+
+error:
+ bio_endio(pw->orig_bio, err);
+ __free_page(pw->imd_bio->bi_io_vec->bv_page);
+ bio_put(pw->imd_bio);
+ kfree(pw);
+ return;
+}
+
+static void queue_write_stage2(struct bio *bio, int error)
+{
+ struct pending_write *pw = bio->bi_private;
+
+ if (error)
+ pw->error = true;
+
+ if (!atomic_dec_and_test(&pw->nr_pending))
+ return;
+
+ INIT_WORK(&(pw->work2), write_stage2);
+ queue_work(io_wq, &(pw->work2));
+}
+
+static void write_stage2(struct work_struct *work)
+{
+ struct pending_write *pw;
+
+ pw = container_of(work, struct pending_write, work2);
+
+ /* TODO: use decent locking */
+ up(&wq_lock);
+
+ if (bio_integrity(pw->orig_bio))
+ set_bi_from_imd(pw->orig_bio, pw->imd_bio);
+
+ /* free the imd_bio resources */
+ __free_page(pw->imd_bio->bi_io_vec->bv_page);
+ bio_put(pw->imd_bio);
+
+ /* XXX: like read_complete(): is the order between this and
+ * bio_put(pw->data_bio) important? */
+ bio_endio(pw->orig_bio, pw->error ? -EIO : 0);
+
+ bio_put(pw->data_bio);
+
+ kfree(pw);
+}
+
+
+/*
+ * Work queue to process bios.
+ *
+ * It is created in dm_csum_init(). It handles both the bios queued by
+ * queue_nice_bio() and the final stages of the bio processing
+ * (read_final_stage() and write_final_stage()).
+ *
+ * TODO: handle more than one pending bio, and dispatch more than one as long
+ * as they don't overlap. Maybe one worqueue per ctx? Or maybe delay the
+ * creation of the workqueue until the first ctx?
+ */
+
+static struct workqueue_struct *submit_wq;
+
+struct pending_work {
+ struct dm_target *ti;
+ struct bio *bio;
+ struct work_struct w;
+};
+
+static void process_nice_bio(struct work_struct *work)
+{
+ struct pending_work *pending;
+ struct dm_target *ti;
+ struct bio *bio;
+
+ pending = container_of(work, struct pending_work, w);
+
+ ti = pending->ti;
+ bio = pending->bio;
+
+ /* TODO: use decent locking
+ * At the moment, this lock is up()ed at the final stage of the
+ * read/write code, when the bio has been processed */
+ down(&wq_lock);
+
+ switch (bio_data_dir(bio)) {
+ case READ:
+ read_nice_bio(ti, bio);
+ break;
+ case WRITE:
+ write_nice_bio(ti, bio);
+ break;
+ default:
+ dprint("Unknown direction\n");
+ BUG();
+ break;
+ }
+
+ kfree(pending);
+}
+
+static int queue_nice_bio(struct dm_target *ti, struct bio *bio)
+{
+ struct pending_work *pending;
+
+ pending = kmalloc(sizeof(struct pending_work), GFP_NOIO);
+ if (pending == NULL)
+ return -ENOMEM;
+
+ pending->ti = ti;
+ pending->bio = bio;
+
+ INIT_WORK(&(pending->w), process_nice_bio);
+
+ queue_work(submit_wq, &(pending->w));
+
+ return 0;
+}
+
+
+/*
+ * Evil bio handling
+ *
+ * Evil bios are split into nice ones in a direction-independant way, and then
+ * go through the direction-dependant code (which is prepared to deal with
+ * nice bios only, because it makes the code much simpler).
+ *
+ * When all the nice bios are completed, we end the original, evil bio.
+ */
+
+/* Determines if a bio is evil */
+static int bio_is_evil(struct dm_target *ti, struct bio *bio)
+{
+ sector_t mapped_first, mapped_last;
+
+ /* To detect when a bio is evil, we see if the mapped sector count is
+ * larger than the bio sector count */
+ mapped_first = map_data_sector_same(ti, bio->bi_sector);
+ mapped_last = map_data_sector_same(ti,
+ bio->bi_sector + bio_sectors(bio) - 1);
+
+ return (mapped_last - mapped_first) != (bio_sectors(bio) - 1);
+}
+
+
+/* Used to track pending evil bios */
+struct pending_evil_bio {
+ struct csum_c *cc;
+
+ /* original evil bio */
+ struct bio *orig_bio;
+
+ /* number of bios pending */
+ atomic_t nr_pending;
+
+ /* were there any errors? */
+ bool error;
+
+};
+
+static int handle_evil_bio(struct dm_target *ti, struct bio *bio);
+static struct bio *prepare_nice_bio(struct pending_evil_bio *peb,
+ struct bio *bio, sector_t begin, sector_t size);
+static void evil_bio_complete(struct bio *bio, int error);
+
+/* Handle an evil bio, by splitting it into nice ones and processing them */
+static int handle_evil_bio(struct dm_target *ti, struct bio *bio)
+{
+ int i, r;
+ sector_t first, last, prelude, postlude;
+ unsigned int nmiddle, submitted_bios, expected_bios;
+ struct pending_evil_bio *peb;
+ struct bio *new;
+
+ /*
+ dprint("evil bio! s:%lu n:%lu l:%lu d:%d \ti:%lu o:%lu\t\tp:%p\n",
+ bio->bi_sector, bio_sectors(bio), bio->bi_size,
+ bio_data_dir(bio),
+ bio->bi_idx, bio_iovec(bio)->bv_offset,
+ bio_iovec(bio)->bv_page);
+ */
+
+ peb = kmalloc(sizeof(*peb), GFP_NOIO);
+ if (peb == NULL)
+ return -ENOMEM;
+
+ peb->orig_bio = bio;
+ peb->error = false;
+ peb->cc = ti->private;
+
+ /* We will split the bio in:
+ * - optionally a "prelude bio" of sectors <= SECTORS_PER_IMD
+ * - 0 or more "middle bios" sectors == SECTORS_PER_IMD
+ * - a "postlude bio" <= SECTORS_PER_IMD
+ *
+ * TODO: there's room to simplify this math, we're keeping it simple
+ * for now
+ */
+ first = bio->bi_sector;
+ last = bio->bi_sector + bio_sectors(bio);
+
+ /* How many sectors until the first cut */
+ prelude = dm_sector_div_up(first, SECTORS_PER_IMD)
+ * SECTORS_PER_IMD - first;
+
+ /* How many sectors from the last cut until last */
+ postlude = last - (dm_sector_div_up(last, SECTORS_PER_IMD) - 1)
+ * SECTORS_PER_IMD;
+
+ /* How many SECTORS_PER_IMD are between the first and last cuts */
+ nmiddle = ( (last - postlude) - (first + prelude) ) / SECTORS_PER_IMD;
+
+ expected_bios = 1 + nmiddle + 1;
+ atomic_set(&peb->nr_pending, expected_bios);
+
+ /*
+ dprint(" first:%lu last:%lu pre:%lu nm:%lu post:%lu pending:%lu\n",
+ first, last, prelude, nmiddle, postlude,
+ peb->nr_pending);
+ */
+
+ submitted_bios = 0;
+
+ /* From now on, access to peb will be locked to avoid races with
+ * evil_bio_complete() */
+
+ /* Submit the prelude bio */
+ if (prelude) {
+ new = prepare_nice_bio(peb, bio, first, prelude);
+ if (new == NULL) {
+ kfree(peb);
+ return -ENOMEM;
+ }
+
+ r = queue_nice_bio(ti, new);
+ if (r < 0)
+ goto prepare_error;
+
+ submitted_bios++;
+ }
+
+ /* Submit the middle bios */
+ for (i = 0; i < nmiddle; i++) {
+ new = prepare_nice_bio(peb, bio,
+ first + prelude + i * SECTORS_PER_IMD,
+ SECTORS_PER_IMD);
+ if (new == NULL)
+ goto prepare_error;
+
+ r = queue_nice_bio(ti, new);
+ if (r < 0)
+ goto prepare_error;
+
+ submitted_bios++;
+ }
+
+ /* Submit the postlude bio */
+ new = prepare_nice_bio(peb, bio, (last - postlude), postlude);
+ if (new == NULL) {
+ goto prepare_error;
+ }
+ r = queue_nice_bio(ti, new);
+ if (r < 0)
+ goto prepare_error;
+
+ submitted_bios++;
+
+ return 0;
+
+prepare_error:
+ /* There was an error in prepare_nice_bio(), but we already have some
+ * in-flight bios that have been submitted and will call
+ * evil_bio_complete() when they're done; decrement the expected
+ * number of bios, and check if we're already done */
+ atomic_sub(expected_bios - submitted_bios, &peb->nr_pending);
+ peb->error = true;
+
+ if (atomic_read(&peb->nr_pending) == 0) {
+ kfree(peb);
+ return -ENOMEM;
+ }
+
+ return 0;
+}
+
+/* Prepare a new nice bio cloned from the original one */
+static struct bio *prepare_nice_bio(struct pending_evil_bio *peb,
+ struct bio *bio, sector_t begin, sector_t size)
+{
+ int segno, advance, sofar;
+ struct bio *new;
+ struct bio_vec *bvec;
+
+ new = bio_clone(bio, GFP_NOIO);
+ if (new == NULL)
+ return NULL;
+
+ new->bi_sector = begin;
+ new->bi_size = size * 512;
+
+ WARN_ON(bio_sectors(new) != size);
+
+ /* Make the new bio start in the right idx and offset
+ * TODO: this can be optimized because we're walking the same thing
+ * over and over */
+
+ advance = (begin - bio->bi_sector) * 512;
+ sofar = 0;
+ segno = 0; /* will be set to bio->bi_idx by bio_for_each_segment */
+ bio_for_each_segment(bvec, new, segno) {
+ if (sofar + bvec->bv_len > advance) {
+ break;
+ }
+
+ sofar += bvec->bv_len;
+ }
+
+ new->bi_idx = segno;
+ bio_iovec(new)->bv_offset += advance - sofar;
+ bio_iovec(new)->bv_len =
+ min(new->bi_size, bio_iovec(new)->bv_len - advance - sofar);
+
+ new->bi_private = peb;
+ new->bi_end_io = evil_bio_complete;
+
+ /* trim it so that the new bip_vec (which is shared with the original
+ * bio) points to the right offset */
+ if (bio_integrity(bio))
+ bio_integrity_trim(new, begin - bio->bi_sector, size);
+
+ return new;
+}
+
+static void evil_bio_complete(struct bio *bio, int error)
+{
+ struct pending_evil_bio *peb = bio->bi_private;
+
+ if (error)
+ peb->error = true;
+
+ if (atomic_dec_and_test(&peb->nr_pending)) {
+ bio_endio(peb->orig_bio, peb->error ? -EIO : 0);
+ kfree(peb);
+ }
+
+ /* put the bio created with bio_clone() because we don't longer care
+ * about it */
+ bio_put(bio);
+}
+
+
+/*
+ * Device mapper
+ */
+
+/* Constructor: <data dev path> <data dev offset> \
+ * [ <integrity dev path> <integrity dev offset> ] */
+static int csum_ctr(struct dm_target *ti, unsigned int argc, char **argv)
+{
+ int err;
+ fmode_t mode;
+ unsigned long long data_offset, imd_offset;
+ sector_t data_dev_len;
+ struct csum_c *cc;
+
+ if (argc != 2 && argc != 4) {
+ ti->error = "Incorrect number of arguments";
+ return -EINVAL;
+ }
+
+ cc = kmalloc(sizeof(*cc), GFP_KERNEL);
+ if (cc == NULL) {
+ ti->error = "Cannot allocate context information";
+ return -ENOMEM;
+ }
+ cc->data_dev = cc->imd_dev = NULL;
+ cc->data_start = cc->imd_start = cc->imd_len = 0;
+
+ err = -EINVAL;
+
+ if (sscanf(argv[1], "%llu", &data_offset) != 1) {
+ ti->error = "Invalid data dev offset";
+ goto error;
+ }
+ cc->data_start = data_offset;
+
+ /* If we have both data and metadata on the same device, the
+ * advertised size of the dm device will be slightly less than the
+ * total, to account for the space dedicated to the metadata */
+ if (argc == 2) {
+ data_dev_len = ti->len + imd_sectors_needed(ti->len);
+ } else {
+ data_dev_len = ti->len;
+ }
+
+ mode = dm_table_get_mode(ti->table);
+ if (dm_get_device(ti, argv[0], cc->data_start, data_dev_len, mode,
+ &(cc->data_dev))) {
+ ti->error = "data device lookup failed";
+ goto error;
+ }
+
+ if (argc == 2) {
+ cc->map_data_sector = map_data_sector_same;
+ cc->get_imd_sector = get_imd_sector_same;
+ cc->imd_dev = cc->data_dev;
+ } else if (argc == 4) {
+ if (sscanf(argv[3], "%llu", &imd_offset) != 1) {
+ ti->error = "Invalid integrity dev offset";
+ goto error;
+ }
+ cc->imd_start = imd_offset;
+ cc->imd_len = imd_sectors_needed(ti->len);
+
+ if (dm_get_device(ti, argv[2], cc->imd_start,
+ cc->imd_len, mode, &(cc->imd_dev))) {
+ ti->error = "Integrity device lookup failed";
+ goto error;
+ }
+
+ cc->map_data_sector = map_data_sector_diff;
+ cc->get_imd_sector = get_imd_sector_diff;
+ }
+
+ ti->private = cc;
+
+ if (bi_register(ti) != 0) {
+ ti->error = "Couldn't register with bio-integrity";
+ goto error;
+ }
+
+ return 0;
+
+error:
+ if (cc->data_dev) {
+ if (cc->data_dev == cc->imd_dev) {
+ dm_put_device(ti, cc->data_dev);
+ } else {
+ dm_put_device(ti, cc->data_dev);
+ dm_put_device(ti, cc->imd_dev);
+ }
+ }
+ kfree(cc);
+ return err;
+}
+
+/* Destructor, undoes what was done in the constructor */
+static void csum_dtr(struct dm_target *ti)
+{
+ struct csum_c *cc = ti->private;
+
+ bi_unregister(ti);
+
+ if (cc->data_dev == cc->imd_dev) {
+ dm_put_device(ti, cc->data_dev);
+ } else {
+ dm_put_device(ti, cc->data_dev);
+ dm_put_device(ti, cc->imd_dev);
+ }
+
+ kfree(cc);
+}
+
+/* Operation mapping */
+static int csum_map(struct dm_target *ti, struct bio *bio,
+ union map_info *map_context)
+{
+ int rv;
+
+ if (bio_is_evil(ti, bio))
+ rv = handle_evil_bio(ti, bio);
+ else
+ rv = queue_nice_bio(ti, bio);
+
+ if (rv < 0)
+ return rv;
+
+ return DM_MAPIO_SUBMITTED;
+}
+
+
+/*
+ * Target registration and module stuff
+ */
+
+static struct target_type csum_target = {
+ .name = "csum",
+ .version = {1, 0, 0},
+ .module = THIS_MODULE,
+ .ctr = csum_ctr,
+ .dtr = csum_dtr,
+ .map = csum_map,
+};
+
+static int __init dm_csum_init(void)
+{
+ int dm_rv;
+
+ submit_wq = create_workqueue("dm-csum-s");
+ if (submit_wq == NULL)
+ return -ENOMEM;
+
+ io_wq = create_workqueue("dm-csum-io");
+ if (io_wq == NULL) {
+ destroy_workqueue(submit_wq);
+ return -ENOMEM;
+ }
+
+ dm_rv = dm_register_target(&csum_target);
+ if (dm_rv < 0) {
+ DMERR("register failed: %d", dm_rv);
+ destroy_workqueue(submit_wq);
+ destroy_workqueue(io_wq);
+ return dm_rv;
+ }
+
+ return 0;
+}
+
+static void __exit dm_csum_exit(void)
+{
+ dm_unregister_target(&csum_target);
+ destroy_workqueue(submit_wq);
+ destroy_workqueue(io_wq);
+}
+
+module_init(dm_csum_init)
+module_exit(dm_csum_exit)
+
+MODULE_AUTHOR("Alberto Bertogli <albertito@xxxxxxxxxxxxxx>");
+MODULE_DESCRIPTION(DM_NAME " checksumming I/O target");
+MODULE_LICENSE("GPL v2");
+