Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
default.c 10.28 KiB
#define _FASTWRITER_DEFAULT_C

#define _GNU_SOURCE
#define _XOPEN_SOURCE 600
#define _POSIX_C_SOURCE 200112L
#define _LARGEFILE64_SOURCE

#include "config.h"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <limits.h>
#include <errno.h>

#include <pthread.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>

#include <fcntl.h>


#ifdef HAVE_LINUX_FALLOC_H
# include <linux/falloc.h>
#endif /* HAVE_LINUX_FALLOC_H */

#ifndef DISABLE_XFS_REALTIME
# include <xfs/xfs.h>
#endif /* !DISABLE_XFS_REALTIME */


#include "fastwriter.h"
#include "private.h"
#include "sysinfo.h"
#include "default.h"

#define SYNC_MODE
#define HAVE_FALLOCATE
#define EXT4_WRITEBLOCK 4194304
#define EXT4_PREALLOCATE 1073741824
#define OCFS_WRITEBLOCK 262144
#define AIO_QUEUE_LENGTH 4
#define AIO_BUFFERS 8


#ifndef DISABLE_AIO
# include <libaio.h>
# if AIO_QUEUE_LENGTH > AIO_BUFFERS
#  error "AIO_QUEUE_LENGTH > AIO_BUFFERS"
# endif
#endif /* DISABLE_AIO */


#ifndef DISABLE_AIO
typedef struct {
    size_t offset;
    size_t size;
    int ios;
    int ready;			/**< 0 - unused, 1 - processing, 2 - done */
} fastwriter_data_t;
#endif /* !DISABLE_AIO */

typedef struct {
    int fd;

    int sync_mode;		/**< Open with O_DIRECT flag to avoid caches */
    int aio_mode;		/**< Use kernel AIO (libaio.h) */
    
    size_t prior_size;		/**< original size of file */
    size_t preallocated;	/**< preallocated bytes */
    
    size_t wr_block;		/**< minimal block of data to write */
    size_t pa_block;		/**< preallocation setp */

#ifndef DISABLE_AIO
    io_context_t aio;
    
    int ios_ready_n;
    int ios_ready[AIO_QUEUE_LENGTH];
    struct iocb ios[AIO_QUEUE_LENGTH];

    int data_head, data_tail;
    fastwriter_data_t data[AIO_BUFFERS];
    
    int ios_status[AIO_QUEUE_LENGTH];
    
    size_t sched;		/**< how far we ahead of currently writted head */
    size_t fd_offset;		/**< current file offset */

    int page_size;
#endif /* !DISABLE_AIO */
} fastwriter_default_t;


int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags_t flags) {
    int err;
    char fs[16];

#ifndef DISABLE_XFS_REALTIME
    struct fsxattr attr;
#endif /* !DISABLE_XFS_REALTIME */

    int open_flags = (O_CREAT|O_WRONLY|O_NOATIME|O_LARGEFILE);
    int open_mode = (S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);

    
    fastwriter_default_t *ctx;

    err = fastwriter_get_file_fs(name, sizeof(fs) - 1, fs);
    if (err) return err;
    
    ctx = (fastwriter_default_t*)malloc(sizeof(fastwriter_default_t));
    if (!ctx) return ENOMEM;

    memset(ctx, 0, sizeof(fastwriter_default_t));

    fw->ctx = ctx;

#ifdef SYNC_MODE
    ctx->sync_mode = 1;
#endif /* SYNC_MODE */

    ctx->prior_size = 0;

    if (!strcmp(fs, "raw")) {
	ctx->wr_block = EXT4_WRITEBLOCK;
	ctx->pa_block = 0;
	ctx->prior_size = (size_t)-1;
	open_flags &= ~(O_CREAT|O_NOATIME|O_LARGEFILE);
    } else if (!strcmp(fs, "ext4")) {
	ctx->wr_block = EXT4_WRITEBLOCK;
	ctx->pa_block = EXT4_PREALLOCATE;
    } else if (!strcmp(fs, "btrfs")) {
	ctx->wr_block = EXT4_WRITEBLOCK;
	ctx->pa_block = EXT4_PREALLOCATE;
    } else if (!strcmp(fs, "xfs")) {
	ctx->wr_block = EXT4_WRITEBLOCK;
	ctx->pa_block = EXT4_PREALLOCATE;
    } else if (!strcmp(fs, "ocfs2")) {
#ifndef DISABLE_AIO
	ctx->aio_mode = 1;
	ctx->sync_mode = 0;
	ctx->wr_block = OCFS_WRITEBLOCK;
#else /* !DISABLE_AIO */
	ctx->wr_block = EXT4_WRITEBLOCK;
#endif /* !DISABLE_AIO */
	ctx->pa_block = EXT4_PREALLOCATE;
/*    } else if (!strcmp(fs, "fhgfs")) {
	ctx->sync_mode = 0;
	ctx->wr_block = OCFS_WRITEBLOCK;
	ctx->pa_block = EXT4_PREALLOCATE;
    } else if (strstr(fs, "gluster")) {
	ctx->sync_mode = 0;
	ctx->wr_block = OCFS_WRITEBLOCK;
	ctx->pa_block = EXT4_PREALLOCATE;*/
    } else {
	ctx->sync_mode = 0;
	ctx->wr_block = OCFS_WRITEBLOCK;
	ctx->pa_block = 0;
    }

    if (ctx->sync_mode) {
	open_flags |= O_DIRECT;
    }

    if (flags&FASTWRITER_FLAGS_OVERWRITE)
	open_flags |= O_TRUNC;

    ctx->fd = open(name, open_flags, open_mode);
    if (ctx->fd < 0) {
	    // Running as normal user, try to disable direct mode
	if ((errno == EINVAL)&&(ctx->sync_mode)) {
	    ctx->sync_mode = 0;
	    open_flags &= ~O_DIRECT;
	    ctx->fd = open(name, open_flags, open_mode);
	}
	if (ctx->fd < 0) return errno;
    }

    if (((open_flags&FASTWRITER_FLAGS_OVERWRITE)==0)&&(strcmp(fs, "raw"))) {
	ctx->prior_size = lseek64(ctx->fd, 0, SEEK_END);

	if (ctx->prior_size%FASTWRITER_SYNCIO_ALIGN) {
	    close(ctx->fd);
	    
	    ctx->fd = open(name, open_flags&~O_DIRECT, open_mode);
	    if (ctx->fd < 0) return errno;
	    
	    ctx->prior_size = lseek64(ctx->fd, 0, SEEK_END);
	    
	    ctx->sync_mode = 0;
	    ctx->aio_mode = 0;
	}
    }

#ifndef DISABLE_XFS_REALTIME
    if (!strcmp(fs, "xfs")) {
	err = xfsctl (name, ctx->fd, XFS_IOC_FSGETXATTR, (void *) &attr);
	if (!err) {
	    attr.fsx_xflags |= XFS_XFLAG_REALTIME;
	    err = xfsctl (name, ctx->fd, XFS_IOC_FSSETXATTR, (void *) &attr);
	    if (err) fprintf(stderr, "Error initializing XFS real-time mode (%i), disabling...\n", err);
	}
    }
#endif /* !DISABLE_XFS_REALTIME */

#ifndef DISABLE_AIO
    if (ctx->aio_mode) {
	int i;
	ctx->page_size = getpagesize();
	ctx->fd_offset = ctx->prior_size;

	ctx->ios_ready_n = AIO_QUEUE_LENGTH;
	for (i = 0; i < AIO_QUEUE_LENGTH; i++) {
	    ctx->ios_ready[i] = i;
	}
	
	err = io_queue_init(AIO_QUEUE_LENGTH, &ctx->aio);
	if (err) {
	    fprintf(stderr, "Error initializing AIO mode (%i), disabling...\n", -err);
	    ctx->aio_mode = 0;
	}
    }
#endif /* !DISABLE_AIO */

    ctx->preallocated = 0;

    return 0;
}


void fastwriter_default_close(fastwriter_t *fw) {
    if (fw->ctx) {
	fastwriter_default_t *ctx = (fastwriter_default_t*)fw->ctx;

	if (ctx->fd >= 0) {
#ifndef DISABLE_AIO
	    if ((ctx->aio_mode)&&(ctx->aio)) {
		int n_ev;
		struct io_event ev[AIO_QUEUE_LENGTH];
		
		while (ctx->ios_ready_n < AIO_QUEUE_LENGTH) {
		    n_ev = io_getevents(ctx->aio, 1, AIO_QUEUE_LENGTH, &ev[0], NULL);
		    if (n_ev <= 0) {
			fprintf(stderr, "AIO io_getevents have failed (%i)", -n_ev);
			break;
		    }
		    ctx->ios_ready_n += n_ev;
		}
		
		io_queue_release(ctx->aio);
	    }
#endif /* DISABLE_AIO */

#ifdef HAVE_LINUX_FALLOC_H
	    if (ctx->prior_size != (size_t)-1) {
#else /* HAVE_LINUX_FALLOC_H */
	    if ((ctx->prior_size != (size_t)-1)&&((ctx->sync_mode)||(ctx->aio_mode))) {
#endif /* HAVE_LINUX_FALLOC_H */
		ftruncate(ctx->fd, ctx->prior_size + fw->written);
	    }
	    close(ctx->fd);
	}
	
	free(ctx);
	fw->ctx = NULL;
    }
}


int fastwriter_default_write(fastwriter_t *fw, fastwriter_write_flags_t flags, size_t size, void *data, size_t *written) {
    size_t sum = 0;
    size_t delta = 0;
    ssize_t res;
    fastwriter_default_t *ctx = (fastwriter_default_t*)fw->ctx;

    if ((flags&FASTWRITER_WRITE_FLAG_FORCE)==0) {
	if (size < ctx->wr_block) {
	    *written = 0;
	    return 0;
	}

        size -= size % ctx->wr_block;
    }

    if ((ctx->pa_block)&&((fw->written + size) > ctx->preallocated)) {
#ifdef HAVE_LINUX_FALLOC_H
    	if (fallocate(ctx->fd, FALLOC_FL_KEEP_SIZE, ctx->preallocated, ctx->pa_block)) {
#else /* HAVE_LINUX_FALLOC_H */
    	if (posix_fallocate(ctx->fd, ctx->preallocated, ctx->pa_block)) {
#endif /* HAVE_LINUX_FALLOC_H */
	    ctx->pa_block = 0;
	} else {
	    ctx->preallocated += ctx->pa_block;
	}
    }

	// we expect this to happen only at last iteration (buffer is multiply of the required align)
    if (((ctx->aio_mode)||(ctx->sync_mode))&&(size%FASTWRITER_SYNCIO_ALIGN)) {
	delta = FASTWRITER_SYNCIO_ALIGN - size%FASTWRITER_SYNCIO_ALIGN;
    }
    
#ifndef DISABLE_AIO
    if (ctx->aio_mode) {
	int err;
	size_t done = 0;
	size_t sched = 0;

	fastwriter_data_t *iodata;
	struct iocb *newio;
	size_t wr_block = ctx->wr_block;

	do {
	    if (!ctx->ios_ready_n) {
		int i, n_ev;
		struct io_event ev[AIO_QUEUE_LENGTH];
		
		n_ev = io_getevents(ctx->aio, 1, AIO_QUEUE_LENGTH, &ev[0], NULL);
		if (n_ev <= 0) {
		    fprintf(stderr, "AIO io_getevents have failed (%i)", -n_ev);
		    return -n_ev;
		}
		
		for (i = 0; i < n_ev; i++) {
		    fastwriter_data_t *ev_data = (fastwriter_data_t *)(ev[i].data);
		    if ((ev[i].res2)||(ev[i].res < ev_data->size)) {
			fprintf(stderr, "AIO write failed (res: %li, res2: %li, expected: %zu), no handling data will be corrupted...\n", ev[i].res, ev[i].res2, ev_data->size);
			return -ev[i].res2;
		    }

		    ctx->ios_ready[ctx->ios_ready_n++] = ev_data->ios;
//		    printf("Data: %i (ios %i)\n", ev_data->ready, ev_data->ios);
		    ev_data->ready = 2;
		}
		
		while (ctx->data[ctx->data_tail].ready > 1) {
//		    printf("Done: %i %zu\n", ctx->data_tail, ctx->data[ctx->data_tail].offset);
		    ctx->data[ctx->data_tail].ready = 0;

		    done += ctx->data[ctx->data_tail].size;
		    if ((++ctx->data_tail) == AIO_BUFFERS) ctx->data_tail = 0;
		}
	    }
	    
	    if ((ctx->sched + sched) < size) {
		if ((ctx->data_head == ctx->data_tail)&&(ctx->data[ctx->data_head].ready)) continue;
		newio = (struct iocb*)&ctx->ios[ctx->ios_ready[--ctx->ios_ready_n]];
	        iodata = &ctx->data[ctx->data_head];

		if (wr_block > ((size + delta) - (ctx->sched + sched))) {
		    wr_block = (size + delta) - (ctx->sched + sched);
		    if (wr_block % ctx->page_size) {
			fprintf(stderr, "We need to write incomplete page (%zu bytes). This is no supported yet...\n", wr_block);
			return -1;
		    }
		}
		
//		printf("Sched: %lu => %lu (%lu) [tail %lu, head %lu]\n", ctx->sched + sched, ctx->fd_offset, wr_block, ctx->data_tail, ctx->data_head);

		iodata->offset = ctx->fd_offset;
		iodata->size = wr_block;
		iodata->ios = ctx->ios_ready_n;

		io_prep_pwrite(newio, ctx->fd, data + ctx->sched + sched, wr_block, ctx->fd_offset);
		io_set_callback(newio, (void*)iodata);
		err = io_submit(ctx->aio, 1, &newio);
		if (err != 1) {
		    fprintf(stderr, "Error submiting AIO job (%i)\n", -err);
		    return -err;
		}

		iodata->ready = 1;
		sched += wr_block;
		ctx->fd_offset += wr_block;
		if ((++ctx->data_head) == AIO_BUFFERS) ctx->data_head = 0;
	    }
	} while (!done);

	ctx->sched += sched - done;
	size = done;
    } else {
#endif /* !DISABLE_AIO */
	do {
	    res = write(ctx->fd, data + sum, size + delta - sum);
	    if (res < 0) {
		*written = sum;
		return errno;
	    }
	
	    sum += res;
	} while (sum < size);
#ifndef DISABLE_AIO
    }
#endif /* !DISABLE_AIO */

    if ((ctx->sync_mode)||(ctx->aio_mode)) {
	posix_fadvise(ctx->fd, fw->written, size, POSIX_FADV_DONTNEED);
    }

    *written = size;

    return 0;
}