diff --git a/src/archive.c b/src/archive.c index 0f32d9345..6fc920a1b 100644 --- a/src/archive.c +++ b/src/archive.c @@ -15,14 +15,17 @@ static int push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_dir, const char *archive_dir, bool overwrite, bool no_sync, - uint32 archive_timeout); + uint32 archive_timeout, xlogFileType type); #ifdef HAVE_LIBZ static int push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir, - const char *archive_dir, bool overwrite, bool no_sync, - int compress_level, uint32 archive_timeout); + const char *archive_dir, bool overwrite, bool no_sync, + int compress_level, uint32 archive_timeout, xlogFileType type); #endif static void *push_files(void *arg); static void *get_files(void *arg); +static bool +get_wal_file_wrapper(const char *filename, const char *archive_root_dir, + const char *to_fullpath, bool prefetch_mode); static bool get_wal_file(const char *filename, const char *from_path, const char *to_path, bool prefetch_mode); static int get_wal_file_internal(const char *from_path, const char *to_path, FILE *out, @@ -89,8 +92,9 @@ typedef struct typedef struct WALSegno { - char name[MAXFNAMELEN]; - volatile pg_atomic_flag lock; + char name[MAXFNAMELEN]; + volatile pg_atomic_flag lock; + xlogFileType type; } WALSegno; static int push_file(WALSegno *xlogfile, const char *archive_status_dir, @@ -101,6 +105,29 @@ static int push_file(WALSegno *xlogfile, const char *archive_status_dir, static parray *setup_push_filelist(const char *archive_status_dir, const char *first_file, int batch_size); +static parray *setup_archive_subdirs(parray *batch_files, const char *archive_dir); + +static xlogFileType +get_xlogFileType(const char *filename) +{ + + if IsXLogFileName(filename) + return SEGMENT; + + else if IsPartialXLogFileName(filename) + return PARTIAL_SEGMENT; + + else if IsBackupHistoryFileName(filename) + return BACKUP_HISTORY_FILE; + + else if IsTLHistoryFileName(filename) + return HISTORY_FILE; + + else if IsBackupHistoryFileName(filename) + return BACKUP_HISTORY_FILE; + + return UNKNOWN; +} /* * At this point, we already done one roundtrip to archive server @@ -137,6 +164,7 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *pg /* files to push in multi-thread mode */ parray *batch_files = NULL; + parray *archive_subdirs = NULL; int n_threads; if (!no_ready_rename || batch_size > 1) @@ -160,6 +188,20 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *pg parray_num(batch_files), batch_size, is_compress ? "zlib" : "none"); + /* Extract subdirectories */ + archive_subdirs = setup_archive_subdirs(batch_files, instanceState->instance_wal_subdir_path); + if (archive_subdirs) + { + for (i = 0; i < parray_num(archive_subdirs); i++) + { + char *subdir = (char *) parray_get(archive_subdirs, i); + if (fio_mkdir(subdir, DIR_PERMISSION, FIO_BACKUP_HOST) != 0) + elog(ERROR, "Cannot create subdirectory in WAL archive: '%s'", subdir); + pg_free(subdir); + } + parray_free(archive_subdirs); + } + num_threads = n_threads; /* Single-thread push @@ -339,12 +381,12 @@ push_file(WALSegno *xlogfile, const char *archive_status_dir, if (!is_compress) rc = push_file_internal_uncompressed(xlogfile->name, pg_xlog_dir, archive_dir, overwrite, no_sync, - archive_timeout); + archive_timeout, xlogfile->type); #ifdef HAVE_LIBZ else rc = push_file_internal_gz(xlogfile->name, pg_xlog_dir, archive_dir, overwrite, no_sync, compress_level, - archive_timeout); + archive_timeout, xlogfile->type); #endif /* take '--no-ready-rename' flag into account */ @@ -383,13 +425,14 @@ push_file(WALSegno *xlogfile, const char *archive_status_dir, int push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_dir, const char *archive_dir, bool overwrite, bool no_sync, - uint32 archive_timeout) + uint32 archive_timeout, xlogFileType type) { FILE *in = NULL; int out = -1; char *buf = pgut_malloc(OUT_BUF_SIZE); /* 1MB buffer */ char from_fullpath[MAXPGPATH]; char to_fullpath[MAXPGPATH]; + char archive_subdir[MAXPGPATH]; /* partial handling */ struct stat st; char to_fullpath_part[MAXPGPATH]; @@ -402,8 +445,12 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d /* from path */ join_path_components(from_fullpath, pg_xlog_dir, wal_file_name); canonicalize_path(from_fullpath); + + /* calculate subdir in WAL archive */ + get_archive_subdir(archive_subdir, archive_dir, wal_file_name, type); + /* to path */ - join_path_components(to_fullpath, archive_dir, wal_file_name); + join_path_components(to_fullpath, archive_subdir, wal_file_name); canonicalize_path(to_fullpath); /* Open source file for read */ @@ -622,7 +669,7 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d int push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir, const char *archive_dir, bool overwrite, bool no_sync, - int compress_level, uint32 archive_timeout) + int compress_level, uint32 archive_timeout, xlogFileType type) { FILE *in = NULL; gzFile out = NULL; @@ -630,6 +677,7 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir, char from_fullpath[MAXPGPATH]; char to_fullpath[MAXPGPATH]; char to_fullpath_gz[MAXPGPATH]; + char archive_subdir[MAXPGPATH]; /* partial handling */ struct stat st; @@ -644,8 +692,12 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir, /* from path */ join_path_components(from_fullpath, pg_xlog_dir, wal_file_name); canonicalize_path(from_fullpath); + + /* calculate subdir in WAL archive */ + get_archive_subdir(archive_subdir, archive_dir, wal_file_name, type); + /* to path */ - join_path_components(to_fullpath, archive_dir, wal_file_name); + join_path_components(to_fullpath, archive_subdir, wal_file_name); canonicalize_path(to_fullpath); /* destination file with .gz suffix */ @@ -915,8 +967,8 @@ setup_push_filelist(const char *archive_status_dir, const char *first_file, { int i; WALSegno *xlogfile = NULL; - parray *status_files = NULL; - parray *batch_files = parray_new(); + parray *status_files = NULL; + parray *batch_files = parray_new(); /* guarantee that first filename is in batch list */ xlogfile = palloc(sizeof(WALSegno)); @@ -924,6 +976,8 @@ setup_push_filelist(const char *archive_status_dir, const char *first_file, snprintf(xlogfile->name, MAXFNAMELEN, "%s", first_file); parray_append(batch_files, xlogfile); + xlogfile->type = get_xlogFileType(xlogfile->name); + if (batch_size < 2) return batch_files; @@ -955,6 +1009,8 @@ setup_push_filelist(const char *archive_status_dir, const char *first_file, pg_atomic_init_flag(&xlogfile->lock); snprintf(xlogfile->name, MAXFNAMELEN, "%s", filename); + + xlogfile->type = get_xlogFileType(xlogfile->name); parray_append(batch_files, xlogfile); if (parray_num(batch_files) >= batch_size) @@ -1023,7 +1079,7 @@ do_archive_get(InstanceState *instanceState, InstanceConfig *instance, const cha /* full filepath to WAL file in archive directory. * $BACKUP_PATH/wal/instance_name/000000010000000000000001 */ - join_path_components(backup_wal_file_path, instanceState->instance_wal_subdir_path, wal_file_name); + //join_path_components(backup_wal_file_path, instanceState->instance_wal_subdir_path, wal_file_name); INSTR_TIME_SET_CURRENT(start_time); if (num_threads > batch_size) @@ -1152,7 +1208,7 @@ do_archive_get(InstanceState *instanceState, InstanceConfig *instance, const cha while (fail_count < 3) { - if (get_wal_file(wal_file_name, backup_wal_file_path, absolute_wal_file_path, false)) + if (get_wal_file_wrapper(wal_file_name, instanceState->instance_wal_subdir_path, absolute_wal_file_path, false)) { fail_count = 0; elog(INFO, "pg_probackup archive-get copied WAL file %s", wal_file_name); @@ -1235,7 +1291,7 @@ uint32 run_wal_prefetch(const char *prefetch_dir, const char *archive_dir, /* It is ok, maybe requested batch is greater than the number of available * files in the archive */ - if (!get_wal_file(xlogfile->name, from_fullpath, to_fullpath, true)) + if (!get_wal_file_wrapper(xlogfile->name, archive_dir, to_fullpath, true)) { elog(LOG, "Thread [%d]: Failed to prefetch WAL segment %s", 0, xlogfile->name); break; @@ -1309,7 +1365,7 @@ get_files(void *arg) join_path_components(from_fullpath, args->archive_dir, xlogfile->name); join_path_components(to_fullpath, args->prefetch_dir, xlogfile->name); - if (!get_wal_file(xlogfile->name, from_fullpath, to_fullpath, true)) + if (!get_wal_file_wrapper(xlogfile->name, args->archive_dir, to_fullpath, true)) { /* It is ok, maybe requested batch is greater than the number of available * files in the archive @@ -1328,6 +1384,38 @@ get_files(void *arg) return NULL; } +/* + * First we try to copy from WAL archive subdirectory: + * Failing that, try WAL archive root directory + */ +bool +get_wal_file_wrapper(const char *filename, const char *archive_root_dir, + const char *to_fullpath, bool prefetch_mode) +{ + bool success = false; + char archive_subdir[MAXPGPATH]; + char from_fullpath[MAXPGPATH]; + xlogFileType type = get_xlogFileType(filename); + + if (type == SEGMENT || type == PARTIAL_SEGMENT || type == BACKUP_HISTORY_FILE) + { + /* first try subdir ... */ + get_archive_subdir(archive_subdir, archive_root_dir, filename, type); + join_path_components(from_fullpath, archive_subdir, filename); + + success = get_wal_file(filename, from_fullpath, to_fullpath, prefetch_mode); + } + + if (!success) + { + /* ... fallback to archive dir for backward compatibility purposes */ + join_path_components(from_fullpath, archive_root_dir, filename); + success = get_wal_file(filename, from_fullpath, to_fullpath, prefetch_mode); + } + + return success; +} + /* * Copy WAL segment from archive catalog to pgdata with possible decompression. * When running in prefetch mode, we should not error out. @@ -1730,3 +1818,68 @@ uint32 maintain_prefetch(const char *prefetch_dir, XLogSegNo first_segno, uint32 return n_files; } + +/* Calculate subdir path in WAL archive directory. Example: + * 000000010000000200000013 -> 00000002 + */ +void +get_archive_subdir(char *archive_subdir, const char *archive_dir, const char *wal_file_name, xlogFileType type) +{ + if (type == SEGMENT || type == PARTIAL_SEGMENT || type == BACKUP_HISTORY_FILE) + { + int rc = 0; + char tli[MAXFNAMELEN]; + char log[MAXFNAMELEN]; + char suffix[MAXFNAMELEN]; + + rc = sscanf(wal_file_name, "%08s%08s%s", + (char *) &tli, (char *) &log, (char *) &suffix); + + if (rc == 3) + { + join_path_components(archive_subdir, archive_dir, log); + return; + } + } + + /* for all other files just use root directory of WAL archive */ + strcpy(archive_subdir, archive_dir); +} + +/* Extract array of WAL archive subdirs using push filelist */ +parray* +setup_archive_subdirs(parray *batch_files, const char *archive_dir) +{ + int i; + parray *subdirs = NULL; + char *cur_subdir = NULL; + + /* + * - Do we need to sort batch_files? + * - No, we rely on sorting of status files + */ + + for (i = 0; i < parray_num(batch_files); i++) + { + WALSegno *xlogfile = (WALSegno *) parray_get(batch_files, i); + + if (xlogfile->type == SEGMENT || xlogfile->type == PARTIAL_SEGMENT || xlogfile->type == BACKUP_HISTORY_FILE) + { + char subdir[MAXPGPATH]; + + if (!subdirs) + subdirs = parray_new(); + + get_archive_subdir(subdir, archive_dir, xlogfile->name, xlogfile->type); + + /* do not append the same subdir twice */ + if (cur_subdir && strcmp(cur_subdir, subdir) == 0) + continue; + + cur_subdir = pgut_strdup(subdir); + parray_append(subdirs, cur_subdir); + } + } + + return subdirs; +} diff --git a/src/backup.c b/src/backup.c index c575865c4..af45cac92 100644 --- a/src/backup.c +++ b/src/backup.c @@ -1237,17 +1237,12 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l int timeout_elevel, bool in_stream_dir) { XLogSegNo targetSegNo; - char wal_segment_path[MAXPGPATH], + char wal_segment_path[MAXPGPATH], /* used only for reporting */ wal_segment[MAXFNAMELEN]; - bool file_exists = false; uint32 try_count = 0, timeout; char *wal_delivery_str = in_stream_dir ? "streamed":"archived"; -#ifdef HAVE_LIBZ - char gz_wal_segment_path[MAXPGPATH]; -#endif - /* Compute the name of the WAL file containing requested LSN */ GetXLogSegNo(target_lsn, targetSegNo, instance_config.xlog_seg_size); if (in_prev_segment) @@ -1255,7 +1250,16 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l GetXLogFileName(wal_segment, tli, targetSegNo, instance_config.xlog_seg_size); - join_path_components(wal_segment_path, wal_segment_dir, wal_segment); + // obtain WAL archive subdir for ARCHIVE backup + if (in_stream_dir) + join_path_components(wal_segment_path, wal_segment_dir, wal_segment); + else + { + char wal_segment_subdir[MAXPGPATH]; + get_archive_subdir(wal_segment_subdir, wal_segment_dir, wal_segment, SEGMENT); + join_path_components(wal_segment_path, wal_segment_subdir, wal_segment); + } + /* * In pg_start_backup we wait for 'target_lsn' in 'pg_wal' directory if it is * stream and non-page backup. Page backup needs archived WAL files, so we @@ -1276,30 +1280,10 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l elog(LOG, "Looking for LSN %X/%X in segment: %s", (uint32) (target_lsn >> 32), (uint32) target_lsn, wal_segment); -#ifdef HAVE_LIBZ - snprintf(gz_wal_segment_path, sizeof(gz_wal_segment_path), "%s.gz", - wal_segment_path); -#endif - /* Wait until target LSN is archived or streamed */ while (true) { - if (!file_exists) - { - file_exists = fileExists(wal_segment_path, FIO_BACKUP_HOST); - - /* Try to find compressed WAL file */ - if (!file_exists) - { -#ifdef HAVE_LIBZ - file_exists = fileExists(gz_wal_segment_path, FIO_BACKUP_HOST); - if (file_exists) - elog(LOG, "Found compressed WAL segment: %s", wal_segment_path); -#endif - } - else - elog(LOG, "Found WAL segment: %s", wal_segment_path); - } + bool file_exists = IsWalFileExists(wal_segment, wal_segment_dir, in_stream_dir); if (file_exists) { @@ -1312,7 +1296,7 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l */ if (!XRecOffIsNull(target_lsn) && wal_contains_lsn(wal_segment_dir, target_lsn, tli, - instance_config.xlog_seg_size)) + instance_config.xlog_seg_size, !in_stream_dir)) /* Target LSN was found */ { elog(LOG, "Found LSN: %X/%X", (uint32) (target_lsn >> 32), (uint32) target_lsn); @@ -1908,7 +1892,8 @@ pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startb if (!read_recovery_info(xlog_path, backup->tli, instance_config.xlog_seg_size, backup->start_lsn, backup->stop_lsn, - &backup->recovery_time)) + &backup->recovery_time, + !backup->stream)) { elog(LOG, "Failed to find Recovery Time in WAL, forced to trust current_timestamp"); backup->recovery_time = stop_backup_result.invocation_time; diff --git a/src/catalog.c b/src/catalog.c index b4ed8c189..d82694a07 100644 --- a/src/catalog.c +++ b/src/catalog.c @@ -1628,6 +1628,9 @@ catalog_get_timelines(InstanceState *instanceState, InstanceConfig *instance) else if (strcmp(suffix, "gz") != 0) { elog(WARNING, "unexpected WAL file name \"%s\"", file->name); + pgFileFree(file); + parray_remove(xlog_files_list, i); + i--; continue; } } @@ -1724,8 +1727,23 @@ catalog_get_timelines(InstanceState *instanceState, InstanceConfig *instance) parray_walk(timelines, pfree); parray_free(timelines); } + /* + * Add WAL archive subdirectories to filelist (used only in delete) + * TODO: currently only directory with 8-character name is treated as WAL subdir, is it ok? + */ + else if (S_ISDIR(file->mode) && strspn(file->rel_path, "0123456789ABCDEF") == 8) + { + if (instanceState->wal_archive_subdirs == NULL) + instanceState->wal_archive_subdirs = parray_new(); + parray_append(instanceState->wal_archive_subdirs, file); + } else + { elog(WARNING, "unexpected WAL file name \"%s\"", file->name); + pgFileFree(file); + parray_remove(xlog_files_list, i); + i--; + } } /* save information about backups belonging to each timeline */ @@ -1745,6 +1763,9 @@ catalog_get_timelines(InstanceState *instanceState, InstanceConfig *instance) parray_append(tlinfo->backups, backup); } } + + /* setup locks */ + xlogfilearray_clear_locks(tlinfo->xlog_filelist); } /* determine oldest backup and closest backup for every timeline */ diff --git a/src/delete.c b/src/delete.c index 6c70ff81e..91a14892a 100644 --- a/src/delete.c +++ b/src/delete.c @@ -29,6 +29,23 @@ static bool backup_deleted = false; /* At least one backup was deleted */ static bool backup_merged = false; /* At least one merge was enacted */ static bool wal_deleted = false; /* At least one WAL segments was deleted */ +typedef struct +{ + parray *xlog_filelist; + int thread_num; + bool purge_all; + XLogSegNo OldestToKeepSegNo; + const char *archive_root_dir; + + /* + * Return value from the thread. + * 0 means there is no error, 1 - there is an error. + */ + int ret; +} delete_files_arg; + +static void *delete_walfiles_in_tli_internal(void *arg); + void do_delete(InstanceState *instanceState, time_t backup_id) { @@ -782,7 +799,7 @@ delete_backup_files(pgBackup *backup) elog(INFO, "Progress: (%zd/%zd). Delete file \"%s\"", i + 1, num_files, full_path); - pgFileDelete(file->mode, full_path); + pgFileDelete(file->mode, full_path, ERROR); } parray_walk(files, pgFileFree); @@ -826,6 +843,10 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli size_t wal_size_actual = 0; char wal_pretty_size[20]; bool purge_all = false; + // multi-thread stuff + pthread_t *threads; + delete_files_arg *threads_args; + bool delete_isok = true; /* Timeline is completely empty */ @@ -925,21 +946,105 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli if (dry_run) return; + /* init thread args with own file lists */ + threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads); + threads_args = (delete_files_arg *) palloc(sizeof(delete_files_arg)*num_threads); + + for (i = 0; i < num_threads; i++) + { + delete_files_arg *arg = &(threads_args[i]); + + arg->purge_all = purge_all; + arg->OldestToKeepSegNo = OldestToKeepSegNo; + arg->archive_root_dir = instanceState->instance_wal_subdir_path; + arg->xlog_filelist = tlinfo->xlog_filelist; + arg->thread_num = i+1; + /* By default there are some error */ + arg->ret = 1; + } + + /* Run threads */ + thread_interrupted = false; + for (i = 0; i < num_threads; i++) + { + delete_files_arg *arg = &(threads_args[i]); + + elog(VERBOSE, "Start thread num: %i", i); + pthread_create(&threads[i], NULL, delete_walfiles_in_tli_internal, arg); + } + + /* Wait threads */ + for (i = 0; i < num_threads; i++) + { + pthread_join(threads[i], NULL); + if (threads_args[i].ret == 1) + delete_isok = false; + } + + /* TODO: */ + //if delete_isok + + /* cleanup */ for (i = 0; i < parray_num(tlinfo->xlog_filelist); i++) { xlogFile *wal_file = (xlogFile *) parray_get(tlinfo->xlog_filelist, i); - if (interrupted) + if (wal_file->deleted) + { + pgXlogFileFree(wal_file); + parray_remove(tlinfo->xlog_filelist, i); + i--; + } + } + pg_free(threads); + pg_free(threads_args); + + /* Remove empty subdirectories */ + if (!instanceState->wal_archive_subdirs) + return; + + for (i = 0; i < parray_num(instanceState->wal_archive_subdirs); i++) + { + char fullpath[MAXPGPATH]; + pgFile *file = (pgFile *) parray_get(instanceState->wal_archive_subdirs, i); + + join_path_components(fullpath, instanceState->instance_wal_subdir_path, file->name); + + if (dir_is_empty(fullpath, FIO_LOCAL_HOST)) + { + pgFileDelete(file->mode, fullpath, WARNING); /* WARNING (not ERROR) due to possible race condition */ + pgFileFree(file); + parray_remove(instanceState->wal_archive_subdirs, i); + i--; + } + } +} + +void * +delete_walfiles_in_tli_internal(void *arg) +{ + int i; + delete_files_arg *args = (delete_files_arg *) arg; + + for (i = 0; i < parray_num(args->xlog_filelist); i++) + { + xlogFile *wal_file = (xlogFile *) parray_get(args->xlog_filelist, i); + + if (interrupted || thread_interrupted) elog(ERROR, "interrupted during WAL archive purge"); - /* Any segment equal or greater than EndSegNo must be kept + if (!pg_atomic_test_set_flag(&wal_file->lock)) + continue; + + /* + * Any segment equal or greater than EndSegNo must be kept * unless it`s a 'purge all' scenario. */ - if (purge_all || wal_file->segno < OldestToKeepSegNo) + if (args->purge_all || wal_file->segno < args->OldestToKeepSegNo) { char wal_fullpath[MAXPGPATH]; - join_path_components(wal_fullpath, instanceState->instance_wal_subdir_path, wal_file->file.name); + join_path_components(wal_fullpath, args->archive_root_dir, wal_file->file.rel_path); /* save segment from purging */ if (instance_config.wal_depth >= 0 && wal_file->keep) @@ -953,8 +1058,8 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli { /* Missing file is not considered as error condition */ if (errno != ENOENT) - elog(ERROR, "Could not remove file \"%s\": %s", - wal_fullpath, strerror(errno)); + elog(ERROR, "[Thread: %d] Could not remove file \"%s\": %s", + args->thread_num, wal_fullpath, strerror(errno)); } else { @@ -969,8 +1074,11 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli } wal_deleted = true; + wal_file->deleted = true; } } + + return NULL; } diff --git a/src/dir.c b/src/dir.c index 4ebe0939b..f114c6282 100644 --- a/src/dir.c +++ b/src/dir.c @@ -231,7 +231,7 @@ pgFileInit(const char *rel_path) * If the pgFile points directory, the directory must be empty. */ void -pgFileDelete(mode_t mode, const char *full_path) +pgFileDelete(mode_t mode, const char *full_path, int elevel) { if (S_ISDIR(mode)) { @@ -242,7 +242,7 @@ pgFileDelete(mode_t mode, const char *full_path) else if (errno == ENOTDIR) /* could be symbolic link */ goto delete_file; - elog(ERROR, "Cannot remove directory \"%s\": %s", + elog(elevel, "Cannot remove directory \"%s\": %s", full_path, strerror(errno)); } return; @@ -253,7 +253,7 @@ pgFileDelete(mode_t mode, const char *full_path) { if (errno == ENOENT) return; - elog(ERROR, "Cannot remove file \"%s\": %s", full_path, + elog(elevel, "Cannot remove file \"%s\": %s", full_path, strerror(errno)); } } @@ -405,6 +405,19 @@ pgFileFree(void *file) pfree(file); } +void +pgXlogFileFree(void *xlogfile) +{ + xlogFile *xlogfile_ptr; + + if (xlogfile == NULL) + return; + + xlogfile_ptr = (xlogFile *) xlogfile; + + pg_free(xlogfile_ptr); +} + /* Compare two pgFile with their path in ascending order of ASCII code. */ int pgFileMapComparePath(const void *f1, const void *f2) @@ -813,6 +826,179 @@ dir_check_file(pgFile *file, bool backup_logs) return CHECK_TRUE; } +/* + * List files, symbolic links and directories in the directory "root" and add + * pgFile objects to "files". We add "root" to "files" if add_root is true. + * + * When follow_symlink is true, symbolic link is ignored and only file or + * directory linked to will be listed. + * + * TODO: make it strictly local + */ +//void +//dir_list_archive(parray *files, const char *root, fio_location location) +//{ +// int rc = 0; +// pgFile *root_file = NULL; +// bool follow_symlink = true; +// bool skip_hidden = true +// const char *errormsg = NULL; +// +// root_file = pgFileNew(root, "", follow_symlink, 0, location); +// +// /* directory was deleted */ +// if (file == NULL) +// return; +// +// if fio_is_remote(fio_location) +// rc = fio_dir_list_archive_internal(files, root_file, root, follow_symlink, +// skip_hidden, location); +// else +// rc = dir_list_archive_internal(files, root_file, root, follow_symlink, +// skip_hidden, location); +// +// pgFileFree(file); +//} + +/* + * Get content of root dir, separate dirs into array + * walk dirs on parallel threads. + * Return codes: + * -1 ERROR encountrered + */ +//int +//dir_list_archive_internal(parray *files, pgFile *parent, const char *parent_dir, +// bool follow_symlink, bool skip_hidden, int external_dir_num, +// fio_location location, char **errormsg) +//{ +// DIR *dir; +// struct dirent *dent; +// parray *dirs; +// +// /* Open directory and list contents */ +// dir = opendir(parent_dir); +// if (dir == NULL) +// { +// if (errno == ENOENT) +// { +// /* Maybe the directory was removed */ +// return; +// } +// +// *errormsg = pgut_malloc(ERRMSG_MAX_LEN); +// snprintf(*errormsg, ERRMSG_MAX_LEN, "Cannot open directory \"%s\": %s", parent_dir, strerror(errno)); +// return -1; +// } +// +// errno = 0; +// while ((dent = readdir(dir))) +// { +// pgFile *file; +// char child[MAXPGPATH]; +// char rel_child[MAXPGPATH]; +// char check_res; +// +// join_path_components(child, parent_dir, dent->d_name); +// join_path_components(rel_child, parent->rel_path, dent->d_name); +// +// file = pgFileNew(child, rel_child, follow_symlink, external_dir_num, +// location); +// if (file == NULL) +// continue; +// +// /* Skip entries point current dir or parent dir */ +// if (S_ISDIR(file->mode) && +// (strcmp(dent->d_name, ".") == 0 || strcmp(dent->d_name, "..") == 0)) +// { +// pgFileFree(file); +// continue; +// } +// +// /* skip hidden files and directories */ +// if (skip_hidden && file->name[0] == '.') +// { +// //elog(WARNING, "Skip hidden file: '%s'", child); +// pgFileFree(file); +// continue; +// } +// +// /* +// * Add only files, directories and links. Skip sockets and other +// * unexpected file formats. +// */ +// if (!S_ISDIR(file->mode) && !S_ISREG(file->mode)) +// { +// pgFileFree(file); +// continue; +// } +// +// parray_append(files, file); +// +// /* +// * If the entry is a directory call dir_list_file_internal() +// * recursively. +// */ +// if (S_ISDIR(file->mode)) +// parray_append(dirs, file); +// } +// +// if (errno && errno != ENOENT) +// { +// +// *errormsg = pgut_malloc(ERRMSG_MAX_LEN); +// snprintf(*errormsg, ERRMSG_MAX_LEN, "Cannot read directory \"%s\": %s", +// parent_dir, strerror(errno_tmp)); +// +// closedir(dir); +// return -1; +// } +// +// closedir(dir); +// +// /* parse directories on multiple parallel threads */ +// /* init thread args with own file lists */ +// threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads); +// threads_args = (backup_files_arg *) palloc(sizeof(backup_files_arg)*num_threads); +// +// for (i = 0; i < num_threads; i++) +// { +// backup_files_arg *arg = &(threads_args[i]); +// +// arg->nodeInfo = nodeInfo; +// arg->from_root = instance_config.pgdata; +// arg->to_root = current.database_dir; +// arg->external_prefix = external_prefix; +// arg->external_dirs = external_dirs; +// arg->files_list = backup_files_list; +// arg->prev_filelist = prev_backup_filelist; +// arg->prev_start_lsn = prev_backup_start_lsn; +// arg->hdr_map = &(current.hdr_map); +// arg->thread_num = i+1; +// /* By default there are some error */ +// arg->ret = 1; +// } +// +// /* Run threads */ +// thread_interrupted = false; +// elog(INFO, "Start transferring data files"); +// time(&start_time); +// for (i = 0; i < num_threads; i++) +// { +// backup_files_arg *arg = &(threads_args[i]); +// +// elog(VERBOSE, "Start thread num: %i", i); +// pthread_create(&threads[i], NULL, backup_files, arg); +// } +// +// /* Wait threads */ +// for (i = 0; i < num_threads; i++) +// { +// pthread_join(threads[i], NULL); +// if (threads_args[i].ret == 1) +// backup_isok = false; +// } +//} + /* * List files in parent->path directory. If "exclude" is true do not add into * "files" files from pgdata_exclude_files and directories from @@ -1909,3 +2095,17 @@ pfilearray_clear_locks(parray *file_list) pg_atomic_clear_flag(&file->lock); } } + +/* + * Clear the synchronisation locks in a parray of (xlogFile *)'s + */ +void +xlogfilearray_clear_locks(parray *xlog_list) +{ + int i; + for (i = 0; i < parray_num(xlog_list); i++) + { + xlogFile *file = (xlogFile *) parray_get(xlog_list, i); + pg_atomic_clear_flag(&file->lock); + } +} diff --git a/src/merge.c b/src/merge.c index ff39c2510..96a193e82 100644 --- a/src/merge.c +++ b/src/merge.c @@ -809,7 +809,7 @@ merge_chain(InstanceState *instanceState, /* We need full path, file object has relative path */ join_path_components(full_file_path, full_database_dir, full_file->rel_path); - pgFileDelete(full_file->mode, full_file_path); + pgFileDelete(full_file->mode, full_file_path, ERROR); elog(VERBOSE, "Deleted \"%s\"", full_file_path); } } @@ -1143,7 +1143,7 @@ remove_dir_with_files(const char *path) join_path_components(full_path, path, file->rel_path); - pgFileDelete(file->mode, full_path); + pgFileDelete(file->mode, full_path, ERROR); elog(VERBOSE, "Deleted \"%s\"", full_path); } diff --git a/src/parsexlog.c b/src/parsexlog.c index 7f1ca9c75..28266c211 100644 --- a/src/parsexlog.c +++ b/src/parsexlog.c @@ -115,6 +115,7 @@ typedef struct XLogReaderData gzFile gz_xlogfile; char gz_xlogpath[MAXPGPATH]; #endif + bool honor_subdirs; } XLogReaderData; /* Function to process a WAL record */ @@ -172,7 +173,8 @@ static bool RunXLogThreads(const char *archivedir, bool consistent_read, xlog_record_function process_record, XLogRecTarget *last_rec, - bool inclusive_endpoint); + bool inclusive_endpoint, + bool honor_subdirs); //static XLogReaderState *InitXLogThreadRead(xlog_thread_arg *arg); static bool SwitchThreadToNextWal(XLogReaderState *xlogreader, xlog_thread_arg *arg); @@ -254,7 +256,7 @@ extractPageMap(const char *archivedir, uint32 wal_seg_size, extract_isok = RunXLogThreads(archivedir, 0, InvalidTransactionId, InvalidXLogRecPtr, end_tli, wal_seg_size, startpoint, endpoint, false, extractPageInfo, - NULL, true); + NULL, true, true); else { /* We have to process WAL located on several different xlog intervals, @@ -348,7 +350,7 @@ extractPageMap(const char *archivedir, uint32 wal_seg_size, extract_isok = RunXLogThreads(archivedir, 0, InvalidTransactionId, InvalidXLogRecPtr, tmp_interval->tli, wal_seg_size, tmp_interval->begin_lsn, tmp_interval->end_lsn, - false, extractPageInfo, NULL, inclusive_endpoint); + false, extractPageInfo, NULL, inclusive_endpoint, true); if (!extract_isok) break; @@ -377,7 +379,7 @@ validate_backup_wal_from_start_to_stop(pgBackup *backup, got_endpoint = RunXLogThreads(archivedir, 0, InvalidTransactionId, InvalidXLogRecPtr, tli, xlog_seg_size, backup->start_lsn, backup->stop_lsn, - false, NULL, NULL, true); + false, NULL, NULL, true, !backup->stream); if (!got_endpoint) { @@ -450,6 +452,7 @@ validate_wal(pgBackup *backup, const char *archivedir, elog(WARNING, "Backup %s WAL segments are corrupted", backup_id); return; } + /* * If recovery target is provided check that we can restore backup to a * recovery target time or xid. @@ -490,7 +493,8 @@ validate_wal(pgBackup *backup, const char *archivedir, all_wal = all_wal || RunXLogThreads(archivedir, target_time, target_xid, target_lsn, tli, wal_seg_size, backup->stop_lsn, - InvalidXLogRecPtr, true, validateXLogRecord, &last_rec, true); + InvalidXLogRecPtr, true, validateXLogRecord, &last_rec, true, + true); if (last_rec.rec_time > 0) time2iso(last_timestamp, lengthof(last_timestamp), timestamptz_to_time_t(last_rec.rec_time), false); @@ -532,7 +536,7 @@ validate_wal(pgBackup *backup, const char *archivedir, bool read_recovery_info(const char *archivedir, TimeLineID tli, uint32 wal_seg_size, XLogRecPtr start_lsn, XLogRecPtr stop_lsn, - time_t *recovery_time) + time_t *recovery_time, bool honor_subdirs) { XLogRecPtr startpoint = stop_lsn; XLogReaderState *xlogreader; @@ -549,6 +553,7 @@ read_recovery_info(const char *archivedir, TimeLineID tli, uint32 wal_seg_size, xlogreader = InitXLogPageRead(&reader_data, archivedir, tli, wal_seg_size, false, true, true); + reader_data.honor_subdirs = honor_subdirs; /* Read records from stop_lsn down to start_lsn */ do @@ -608,7 +613,7 @@ read_recovery_info(const char *archivedir, TimeLineID tli, uint32 wal_seg_size, */ bool wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn, - TimeLineID target_tli, uint32 wal_seg_size) + TimeLineID target_tli, uint32 wal_seg_size, bool honor_subdirs) { XLogReaderState *xlogreader; XLogReaderData reader_data; @@ -626,6 +631,7 @@ wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn, elog(ERROR, "Out of memory"); xlogreader->system_identifier = instance_config.system_identifier; + reader_data.honor_subdirs = honor_subdirs; #if PG_VERSION_NUM >= 130000 if (XLogRecPtrIsInvalid(target_lsn)) @@ -1012,34 +1018,124 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, /* Try to switch to the next WAL segment */ if (!reader_data->xlogexists) { - char xlogfname[MAXFNAMELEN]; - char partial_file[MAXPGPATH]; + bool compressed = false; + char xlogfname[MAXFNAMELEN]; +// char partial_file[MAXPGPATH]; + char fullpath[MAXPGPATH]; + char fullpath_gz[MAXPGPATH]; + char fullpath_partial_gz[MAXPGPATH]; GetXLogFileName(xlogfname, reader_data->tli, reader_data->xlogsegno, wal_seg_size); - join_path_components(reader_data->xlogpath, wal_archivedir, xlogfname); - snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s.gz", reader_data->xlogpath); + /* obtain WAL archive subdir for ARCHIVE backup */ + // TODO: move to separate function and rewrite it + if (reader_data->honor_subdirs) + { + char archive_subdir[MAXPGPATH]; + get_archive_subdir(archive_subdir, wal_archivedir, xlogfname, SEGMENT); + + /* default value for xlogpath for error message */ + snprintf(reader_data->xlogpath, MAXPGPATH, "%s/%s", archive_subdir, xlogfname); + + /* check existence of wal_dir/xlogid/segment.gz file ... */ + snprintf(fullpath_gz, MAXPGPATH, "%s/%s.gz", archive_subdir, xlogfname); + + //TODO: rewrite it to something less ugly +#ifdef HAVE_LIBZ + if (fileExists(fullpath_gz, FIO_LOCAL_HOST)) + { + snprintf(reader_data->xlogpath, MAXPGPATH, "%s/%s", archive_subdir, xlogfname); + snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s", fullpath_gz); + compressed = true; + goto file_found; + } + + /* ... failing that check existence of wal_dir/xlogid/segment.partial.gz ... */ + snprintf(fullpath_partial_gz, MAXPGPATH, "%s/%s.partial.gz", archive_subdir, xlogfname); + if (fileExists(fullpath_partial_gz, FIO_LOCAL_HOST)) + { + snprintf(reader_data->xlogpath, MAXPGPATH, "%s/%s.partial", archive_subdir, xlogfname); + snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s", fullpath_partial_gz); + compressed = true; + goto file_found; + } +#endif + /* ... failing that check existence of wal_dir/xlogid/segment ... */ + snprintf(fullpath, MAXPGPATH, "%s/%s", archive_subdir, xlogfname); + if (fileExists(fullpath, FIO_LOCAL_HOST)) + { + snprintf(reader_data->xlogpath, MAXPGPATH, "%s", fullpath); + goto file_found; + } + + goto archive_dir; + } + /* use directory as-is */ + else + { + /* default value for xlogpath for error message */ + snprintf(reader_data->xlogpath, MAXPGPATH, "%s/%s", wal_archivedir, xlogfname); +archive_dir: +#ifdef HAVE_LIBZ + /* ... failing that check existence of wal_dir/segment.gz ... */ + snprintf(fullpath_gz, MAXPGPATH, "%s/%s.gz", wal_archivedir, xlogfname); + if (fileExists(fullpath_gz, FIO_LOCAL_HOST)) + { + snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s", fullpath_gz); + snprintf(reader_data->xlogpath, MAXPGPATH, "%s/%s", wal_archivedir, xlogfname); + compressed = true; + + goto file_found; + } + + /* ... failing that check existence of wal_dir/segment.partial.gz ... */ + snprintf(fullpath_partial_gz, MAXPGPATH, "%s/%s.partial.gz", wal_archivedir, xlogfname); + if (fileExists(wal_archivedir, FIO_LOCAL_HOST)) + { + snprintf(reader_data->xlogpath, MAXPGPATH, "%s/%s.partial", wal_archivedir, xlogfname); + snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s", fullpath_partial_gz); + compressed = true; + goto file_found; + } +#endif + /* ... failing that check existence of wal_dir/segment ... */ + snprintf(fullpath, MAXPGPATH, "%s/%s", wal_archivedir, xlogfname); + if (fileExists(fullpath, FIO_LOCAL_HOST)) + { + snprintf(reader_data->xlogpath, MAXPGPATH, "%s", fullpath); + goto file_found; + } + } + +file_found: + canonicalize_path(reader_data->xlogpath); + +#ifdef HAVE_LIBZ + if (compressed) + canonicalize_path(reader_data->gz_xlogpath); +#endif + +// snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s.gz", reader_data->xlogpath); /* We fall back to using .partial segment in case if we are running * multi-timeline incremental backup right after standby promotion. * TODO: it should be explicitly enabled. */ - snprintf(partial_file, MAXPGPATH, "%s.partial", reader_data->xlogpath); +// snprintf(partial_file, MAXPGPATH, "%s.partial", reader_data->xlogpath); /* If segment do not exists, but the same * segment with '.partial' suffix does, use it instead */ - if (!fileExists(reader_data->xlogpath, FIO_LOCAL_HOST) && - fileExists(partial_file, FIO_LOCAL_HOST)) - { - snprintf(reader_data->xlogpath, MAXPGPATH, "%s", partial_file); - } +// if (!fileExists(reader_data->xlogpath, FIO_LOCAL_HOST) && +// fileExists(partial_file, FIO_LOCAL_HOST)) +// { +// snprintf(reader_data->xlogpath, MAXPGPATH, "%s", partial_file); +// } - if (fileExists(reader_data->xlogpath, FIO_LOCAL_HOST)) + if (!compressed) { elog(LOG, "Thread [%d]: Opening WAL segment \"%s\"", reader_data->thread_num, reader_data->xlogpath); - reader_data->xlogexists = true; reader_data->xlogfile = fio_open(reader_data->xlogpath, O_RDONLY | PG_BINARY, FIO_LOCAL_HOST); @@ -1050,15 +1146,16 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, strerror(errno)); return -1; } + else + reader_data->xlogexists = true; } #ifdef HAVE_LIBZ /* Try to open compressed WAL segment */ - else if (fileExists(reader_data->gz_xlogpath, FIO_LOCAL_HOST)) + else { elog(LOG, "Thread [%d]: Opening compressed WAL segment \"%s\"", reader_data->thread_num, reader_data->gz_xlogpath); - reader_data->xlogexists = true; reader_data->gz_xlogfile = fio_gzopen(reader_data->gz_xlogpath, "rb", -1, FIO_LOCAL_HOST); if (reader_data->gz_xlogfile == NULL) @@ -1068,6 +1165,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, strerror(errno)); return -1; } + else + reader_data->xlogexists = true; } #endif /* Exit without error if WAL segment doesn't exist */ @@ -1191,7 +1290,7 @@ RunXLogThreads(const char *archivedir, time_t target_time, TransactionId target_xid, XLogRecPtr target_lsn, TimeLineID tli, uint32 segment_size, XLogRecPtr startpoint, XLogRecPtr endpoint, bool consistent_read, xlog_record_function process_record, - XLogRecTarget *last_rec, bool inclusive_endpoint) + XLogRecTarget *last_rec, bool inclusive_endpoint, bool honor_subdirs) { pthread_t *threads; xlog_thread_arg *thread_args; @@ -1255,6 +1354,7 @@ RunXLogThreads(const char *archivedir, time_t target_time, consistent_read, false); arg->reader_data.xlogsegno = segno_next; arg->reader_data.thread_num = i + 1; + arg->reader_data.honor_subdirs = honor_subdirs; arg->process_record = process_record; arg->startpoint = startpoint; arg->endpoint = endpoint; @@ -1482,7 +1582,7 @@ XLogThreadWorker(void *arg) reader_data->thread_num, (uint32) (errptr >> 32), (uint32) (errptr)); - /* In we failed to read record located at endpoint position, + /* If we failed to read record located at endpoint position, * and endpoint is not inclusive, do not consider this as an error. */ if (!thread_arg->inclusive_endpoint && @@ -1509,6 +1609,7 @@ XLogThreadWorker(void *arg) if (thread_arg->process_record) thread_arg->process_record(xlogreader, reader_data, &stop_reading); + if (stop_reading) { thread_arg->got_target = true; @@ -1915,7 +2016,7 @@ bool validate_wal_segment(TimeLineID tli, XLogSegNo segno, const char *prefetch_ rc = RunXLogThreads(prefetch_dir, 0, InvalidTransactionId, InvalidXLogRecPtr, tli, wal_seg_size, - startpoint, endpoint, false, NULL, NULL, true); + startpoint, endpoint, false, NULL, NULL, true, false); num_threads = tmp_num_threads; @@ -1946,4 +2047,65 @@ static XLogReaderState* WalReaderAllocate(uint32 wal_seg_size, XLogReaderData *r #else return XLogReaderAllocate(&SimpleXLogPageRead, reader_data); #endif -} \ No newline at end of file +} + +/* + * Is WAL file exists in archive directory + * for stream backup check uncompressed segment in wal_root_dir + * for archive backup first check subdirectory, then fallback to archive directory + */ +bool IsWalFileExists(const char *wal_segment_name, const char *wal_root_dir, bool in_stream_dir) +{ + char wal_file_fullpath[MAXPGPATH]; + char wal_file_fullpath_gz[MAXPGPATH]; + char wal_segment_subdir[MAXPGPATH]; + + if (in_stream_dir) + { + join_path_components(wal_file_fullpath, wal_root_dir, wal_segment_name); + if (fileExists(wal_file_fullpath, FIO_BACKUP_HOST)) + goto found_uncompressed_file; + + goto not_found; + } + + /* obtain subdir in WAL archive */ + get_archive_subdir(wal_segment_subdir, wal_root_dir, wal_segment_name, SEGMENT); + + /* first try uncompressed segment in WAL archive subdir ... */ + join_path_components(wal_file_fullpath, wal_segment_subdir, wal_segment_name); + if (fileExists(wal_file_fullpath, FIO_BACKUP_HOST)) + goto found_uncompressed_file; + +#ifdef HAVE_LIBZ + /* ... fallback to compressed segment in WAL archive subdir ... */ + snprintf(wal_file_fullpath_gz, MAXPGPATH, "%s.gz", wal_file_fullpath); + if (fileExists(wal_file_fullpath_gz, FIO_BACKUP_HOST)) + goto found_compressed_file; +#endif + + /* ... fallback to uncompressed segment in archive dir ... */ + join_path_components(wal_file_fullpath, wal_root_dir, wal_segment_name); + if (fileExists(wal_file_fullpath, FIO_BACKUP_HOST)) + goto found_uncompressed_file; + + /* ... fallback to compressed segment in archive dir */ +#ifdef HAVE_LIBZ + snprintf(wal_file_fullpath_gz, MAXPGPATH, "%s.gz", wal_file_fullpath); + if (fileExists(wal_file_fullpath_gz, FIO_BACKUP_HOST)) + goto found_compressed_file; +#endif + + goto not_found; + +found_compressed_file: + elog(LOG, "Found compressed WAL segment: %s", wal_file_fullpath); + return true; + +found_uncompressed_file: + elog(LOG, "Found WAL segment: %s", wal_file_fullpath_gz); + return true; + +not_found: + return false; +} diff --git a/src/pg_probackup.c b/src/pg_probackup.c index 49e226ace..d7351b939 100644 --- a/src/pg_probackup.c +++ b/src/pg_probackup.c @@ -495,6 +495,7 @@ main(int argc, char *argv[]) catalogState->wal_subdir_path, instanceState->instance_name); join_path_components(instanceState->instance_config_path, instanceState->instance_backup_subdir_path, BACKUP_CATALOG_CONF_FILE); + instanceState->wal_archive_subdirs = NULL; } /* ===== instanceState (END) ======*/ diff --git a/src/pg_probackup.h b/src/pg_probackup.h index b828343dc..dce4dd4d9 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -338,7 +338,7 @@ typedef enum ShowFormat #define BYTES_INVALID (-1) /* file didn`t changed since previous backup, DELTA backup do not rely on it */ #define FILE_NOT_FOUND (-2) /* file disappeared during backup */ #define BLOCKNUM_INVALID (-1) -#define PROGRAM_VERSION "2.5.3" +#define PROGRAM_VERSION "2.5.4" /* update when remote agent API or behaviour changes */ #define AGENT_PROTOCOL_VERSION 20501 @@ -647,9 +647,11 @@ typedef struct lsnInterval typedef enum xlogFileType { + UNKNOWN, SEGMENT, - TEMP_SEGMENT, + TEMP_SEGMENT, // '.part' segment created by archive-push PARTIAL_SEGMENT, + HISTORY_FILE, BACKUP_HISTORY_FILE } xlogFileType; @@ -660,6 +662,8 @@ typedef struct xlogFile xlogFileType type; bool keep; /* Used to prevent removal of WAL segments * required by ARCHIVE backups. */ + bool deleted; + volatile pg_atomic_flag lock;/* lock for synchronization of parallel threads */ } xlogFile; @@ -814,6 +818,8 @@ typedef struct InstanceState /* $BACKUP_PATH/backups/instance_name */ char instance_wal_subdir_path[MAXPGPATH]; // previously global var arclog_path + parray *wal_archive_subdirs; + /* TODO: Make it more specific */ PGconn *conn; @@ -894,6 +900,7 @@ extern void do_archive_push(InstanceState *instanceState, InstanceConfig *instan bool no_sync, bool no_ready_rename); extern void do_archive_get(InstanceState *instanceState, InstanceConfig *instance, const char *prefetch_dir_arg, char *wal_file_path, char *wal_file_name, int batch_size, bool validate_wal); +extern void get_archive_subdir(char *archive_subdir, const char * archive_dir, const char *wal_file_name, xlogFileType type); /* in configure.c */ extern void do_show_config(void); @@ -1048,16 +1055,18 @@ extern int dir_create_dir(const char *path, mode_t mode, bool strict); extern bool dir_is_empty(const char *path, fio_location location); extern bool fileExists(const char *path, fio_location location); +extern bool IsWalFileExists(const char *wal_segment_name, const char *archive_dir, bool in_stream_dir); extern size_t pgFileSize(const char *path); extern pgFile *pgFileNew(const char *path, const char *rel_path, bool follow_symlink, int external_dir_num, fio_location location); extern pgFile *pgFileInit(const char *rel_path); -extern void pgFileDelete(mode_t mode, const char *full_path); +extern void pgFileDelete(mode_t mode, const char *full_path, int elevel); extern void fio_pgFileDelete(pgFile *file, const char *full_path); extern void pgFileFree(void *file); +extern void pgXlogFileFree(void *xlogfile); extern pg_crc32 pgFileGetCRC(const char *file_path, bool use_crc32c, bool missing_ok); extern pg_crc32 pgFileGetCRCgz(const char *file_path, bool use_crc32c, bool missing_ok); @@ -1075,6 +1084,7 @@ extern int pgCompareString(const void *str1, const void *str2); extern int pgPrefixCompareString(const void *str1, const void *str2); extern int pgCompareOid(const void *f1, const void *f2); extern void pfilearray_clear_locks(parray *file_list); +extern void xlogfilearray_clear_locks(parray *xlog_list); /* in data.c */ extern bool check_data_file(ConnectionArgs *arguments, pgFile *file, @@ -1137,9 +1147,9 @@ extern bool validate_wal_segment(TimeLineID tli, XLogSegNo segno, extern bool read_recovery_info(const char *archivedir, TimeLineID tli, uint32 seg_size, XLogRecPtr start_lsn, XLogRecPtr stop_lsn, - time_t *recovery_time); + time_t *recovery_time, bool honor_subdirs); extern bool wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn, - TimeLineID target_tli, uint32 seg_size); + TimeLineID target_tli, uint32 seg_size, bool honor_subdirs); extern XLogRecPtr get_prior_record_lsn(const char *archivedir, XLogRecPtr start_lsn, XLogRecPtr stop_lsn, TimeLineID tli, bool seek_prev_segment, uint32 seg_size); diff --git a/src/utils/file.c b/src/utils/file.c index 7d1df554b..004a5f563 100644 --- a/src/utils/file.c +++ b/src/utils/file.c @@ -3196,7 +3196,7 @@ fio_delete(mode_t mode, const char *fullpath, fio_location location) } else - pgFileDelete(mode, fullpath); + pgFileDelete(mode, fullpath, ERROR); } static void @@ -3204,7 +3204,7 @@ fio_delete_impl(mode_t mode, char *buf) { char *fullpath = (char*) buf; - pgFileDelete(mode, fullpath); + pgFileDelete(mode, fullpath, ERROR); /* TODO: must return rc, not error out internally */ } /* Execute commands at remote host */ diff --git a/tests/archive.py b/tests/archive.py index 22b9d8693..53300c574 100644 --- a/tests/archive.py +++ b/tests/archive.py @@ -369,7 +369,7 @@ def test_archive_push_file_exists(self): self.add_instance(backup_dir, 'node', node) self.set_archiving(backup_dir, 'node', node) - wals_dir = os.path.join(backup_dir, 'wal', 'node') + wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000') if self.archive_compress: filename = '000000010000000000000001.gz' file = os.path.join(wals_dir, filename) @@ -377,6 +377,8 @@ def test_archive_push_file_exists(self): filename = '000000010000000000000001' file = os.path.join(wals_dir, filename) + os.makedirs(wals_dir) + with open(file, 'a+b') as f: f.write(b"blablablaadssaaaaaaaaaaaaaaa") f.flush() @@ -461,7 +463,7 @@ def test_archive_push_file_exists_overwrite(self): self.add_instance(backup_dir, 'node', node) self.set_archiving(backup_dir, 'node', node) - wals_dir = os.path.join(backup_dir, 'wal', 'node') + wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000') if self.archive_compress: filename = '000000010000000000000001.gz' file = os.path.join(wals_dir, filename) @@ -469,6 +471,8 @@ def test_archive_push_file_exists_overwrite(self): filename = '000000010000000000000001' file = os.path.join(wals_dir, filename) + os.makedirs(wals_dir) + with open(file, 'a+b') as f: f.write(b"blablablaadssaaaaaaaaaaaaaaa") f.flush() @@ -565,7 +569,7 @@ def test_archive_push_partial_file_exists(self): filename_orig = filename_orig.decode('utf-8') # form up path to next .part WAL segment - wals_dir = os.path.join(backup_dir, 'wal', 'node') + wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000') if self.archive_compress: filename = filename_orig + '.gz' + '.part' file = os.path.join(wals_dir, filename) @@ -573,6 +577,8 @@ def test_archive_push_partial_file_exists(self): filename = filename_orig + '.part' file = os.path.join(wals_dir, filename) +# os.makedirs(wals_dir) + # emulate stale .part file with open(file, 'a+b') as f: f.write(b"blahblah") @@ -1111,6 +1117,7 @@ def test_archive_pg_receivexlog(self): self.init_pb(backup_dir) self.add_instance(backup_dir, 'node', node) node.slow_start() + if self.get_version(node) < 100000: pg_receivexlog_path = self.get_bin_path('pg_receivexlog') else: @@ -1462,7 +1469,7 @@ def test_archive_catalog(self): self.assertTrue(timeline['status'], 'OK') # create holes in t3 - wals_dir = os.path.join(backup_dir, 'wal', 'replica') + wals_dir = os.path.join(backup_dir, 'wal', 'replica', '00000000') wals = [ f for f in os.listdir(wals_dir) if os.path.isfile(os.path.join(wals_dir, f)) and not f.endswith('.backup') and not f.endswith('.history') and f.startswith('00000003') @@ -1472,17 +1479,17 @@ def test_archive_catalog(self): # check that t3 is ok self.show_archive(backup_dir) - file = os.path.join(backup_dir, 'wal', 'replica', '000000030000000000000017') + file = os.path.join(wals_dir, '000000030000000000000017') if self.archive_compress: file = file + '.gz' os.remove(file) - file = os.path.join(backup_dir, 'wal', 'replica', '000000030000000000000012') + file = os.path.join(wals_dir, '000000030000000000000012') if self.archive_compress: file = file + '.gz' os.remove(file) - file = os.path.join(backup_dir, 'wal', 'replica', '000000030000000000000013') + file = os.path.join(wals_dir, '000000030000000000000013') if self.archive_compress: file = file + '.gz' os.remove(file) @@ -1597,7 +1604,7 @@ def test_archive_catalog_1(self): self.backup_node(backup_dir, 'node', node) node.pgbench_init(scale=2) - wals_dir = os.path.join(backup_dir, 'wal', 'node') + wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000') original_file = os.path.join(wals_dir, '000000010000000000000001.gz') tmp_file = os.path.join(wals_dir, '000000010000000000000001') @@ -1652,7 +1659,7 @@ def test_archive_catalog_2(self): self.backup_node(backup_dir, 'node', node) node.pgbench_init(scale=2) - wals_dir = os.path.join(backup_dir, 'wal', 'node') + wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000') original_file = os.path.join(wals_dir, '000000010000000000000001.gz') tmp_file = os.path.join(wals_dir, '000000010000000000000001') @@ -2503,7 +2510,7 @@ def test_archive_get_prefetch_corruption(self): sleep(20) # now copy WAL files into prefetch directory and corrupt some of them - archive_dir = os.path.join(backup_dir, 'wal', 'node') + archive_dir = os.path.join(backup_dir, 'wal', 'node', '00000000') files = os.listdir(archive_dir) files.sort() @@ -2589,7 +2596,7 @@ def test_archive_show_partial_files_handling(self): self.backup_node(backup_dir, 'node', node) - wals_dir = os.path.join(backup_dir, 'wal', 'node') + wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000') # .part file node.safe_psql( diff --git a/tests/compatibility.py b/tests/compatibility.py index e274c22be..36a626a7c 100644 --- a/tests/compatibility.py +++ b/tests/compatibility.py @@ -615,6 +615,11 @@ def test_backward_compatibility_merge_1(self): merge them with new binary. old binary version =< 2.2.7 """ + if self.version_to_num(self.old_probackup_version) > self.version_to_num('2.2.7'): + self.assertTrue( + False, + 'You need pg_probackup old_binary =< 2.2.7 for this test') + fname = self.id().split('.')[3] backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') node = self.make_simple_node( @@ -1475,10 +1480,78 @@ def test_compatibility_tablespace(self): if self.paranoia: pgdata = self.pgdata_content(node.data_dir) + pgdata_restored = self.pgdata_content(node_restored.data_dir) + self.compare_pgdata(pgdata, pgdata_restored) + + # Clean after yourself + self.del_test_dir(module_name, fname) + + # @unittest.expectedFailure + # @unittest.skip("skip") + def test_archive_subdir(self): + """ + https://github.com/postgrespro/pg_probackup/issues/449 + + Make sure that our WAL reader can fallback from subdir to archive dir + old binary version =< 2.5.2 + """ + if self.version_to_num(self.old_probackup_version) > self.version_to_num('2.5.2'): + self.assertTrue( + False, 'OLD pg_probackup binary must be =< 2.5.2 for this test') + + self.assertNotEqual( + self.version_to_num(self.old_probackup_version), + self.version_to_num(self.probackup_version)) + + fname = self.id().split('.')[3] + backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') + node = self.make_simple_node( + base_dir=os.path.join(module_name, fname, 'node'), + set_replication=True, + initdb_params=['--data-checksums']) + + self.init_pb(backup_dir) + self.add_instance(backup_dir, 'node', node) + self.set_archiving(backup_dir, 'node', node, old_binary=True) + node.slow_start() + + # generate data using old binary + node.pgbench_init(scale=10) + + # TAKE FULL ARCHIVE BACKUP + self.backup_node(backup_dir, 'node', node, old_binary=True) + + # generate some more WAL using old binary + node.pgbench_init(scale=10) + + # generate some WAL using new binary + self.set_archiving(backup_dir, 'node', node) + node.reload() + + # generate some WAL using old binary + pgbench = node.pgbench( + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + options=["-j", "4", "-T", "50"]) + pgbench.wait() + pgbench.stdout.close() + + # TAKE PAGE ARCHIVE BACKUP + self.backup_node(backup_dir, 'node', node, backup_type='page', options=['--archive-timeout=10s']) if self.paranoia: - pgdata_restored = self.pgdata_content(node_restored.data_dir) + pgdata = self.pgdata_content(node.data_dir) + + node.cleanup() + + self.restore_node(backup_dir, 'node', node, options=["-j", "4"]) + + if self.paranoia: + pgdata_restored = self.pgdata_content(node.data_dir) self.compare_pgdata(pgdata, pgdata_restored) + node.slow_start() + node.stop() + # Clean after yourself self.del_test_dir(module_name, fname) diff --git a/tests/delete.py b/tests/delete.py index 345a70284..993e63734 100644 --- a/tests/delete.py +++ b/tests/delete.py @@ -203,9 +203,10 @@ def test_delete_increment_ptrack(self): self.set_archiving(backup_dir, 'node', node) node.slow_start() - node.safe_psql( - 'postgres', - 'CREATE EXTENSION ptrack') + if node.major_version >= 12: + node.safe_psql( + 'postgres', + 'CREATE EXTENSION ptrack') # full backup mode self.backup_node(backup_dir, 'node', node) @@ -263,7 +264,7 @@ def test_delete_orphaned_wal_segments(self): node.stop() # Check wals - wals_dir = os.path.join(backup_dir, 'wal', 'node') + wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000') wals = [f for f in os.listdir(wals_dir) if os.path.isfile(os.path.join(wals_dir, f))] original_wal_quantity = len(wals) @@ -299,8 +300,7 @@ def test_delete_orphaned_wal_segments(self): # Delete last backup self.delete_pb(backup_dir, 'node', backup_3_id, options=['--wal']) - wals = [f for f in os.listdir(wals_dir) if os.path.isfile(os.path.join(wals_dir, f))] - self.assertEqual (0, len(wals), "Number of wals should be equal to 0") + self.assertFalse(os.path.exists(wals_dir), "Number of wals should be equal to 0") # Clean after yourself self.del_test_dir(module_name, fname) diff --git a/tests/expected/option_version.out b/tests/expected/option_version.out index 8b212ac1f..a69cee03d 100644 --- a/tests/expected/option_version.out +++ b/tests/expected/option_version.out @@ -1 +1 @@ -pg_probackup 2.5.3 \ No newline at end of file +pg_probackup 2.5.4 \ No newline at end of file diff --git a/tests/helpers/ptrack_helpers.py b/tests/helpers/ptrack_helpers.py index 3b14b7170..a6636f538 100644 --- a/tests/helpers/ptrack_helpers.py +++ b/tests/helpers/ptrack_helpers.py @@ -1239,7 +1239,8 @@ def delete_pb( options=[], old_binary=False, gdb=False, asynchronous=False): cmd_list = [ 'delete', - '-B', backup_dir + '-B', backup_dir, + '-j', '10' ] cmd_list += ['--instance={0}'.format(instance)] @@ -1253,7 +1254,8 @@ def delete_expired( cmd_list = [ 'delete', '-B', backup_dir, - '--instance={0}'.format(instance) + '--instance={0}'.format(instance), + '-j', '10' ] return self.run_pb(cmd_list + options, old_binary=old_binary) @@ -1308,9 +1310,14 @@ def set_archiving( options['archive_mode'] = 'on' if custom_archive_command is None: + if old_binary: + binary_path = self.probackup_old_path + else: + binary_path = self.probackup_path + if os.name == 'posix': options['archive_command'] = '"{0}" archive-push -B {1} --instance={2} '.format( - self.probackup_path, backup_dir, instance) + binary_path, backup_dir, instance) elif os.name == 'nt': options['archive_command'] = '"{0}" archive-push -B {1} --instance={2} '.format( diff --git a/tests/retention.py b/tests/retention.py index 19204807b..906c7833f 100644 --- a/tests/retention.py +++ b/tests/retention.py @@ -59,7 +59,7 @@ def test_retention_redundancy_1(self): min_wal = output_after['min-segno'] max_wal = output_after['max-segno'] - for wal_name in os.listdir(os.path.join(backup_dir, 'wal', 'node')): + for wal_name in os.listdir(os.path.join(backup_dir, 'wal', 'node', '00000000')): if not wal_name.endswith(".backup"): if self.archive_compress: diff --git a/tests/validate.py b/tests/validate.py index 0b04d92fe..22212879c 100644 --- a/tests/validate.py +++ b/tests/validate.py @@ -1545,7 +1545,7 @@ def test_validate_corrupt_wal_1(self): backup_id_2 = self.backup_node(backup_dir, 'node', node) # Corrupt WAL - wals_dir = os.path.join(backup_dir, 'wal', 'node') + wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000') wals = [f for f in os.listdir(wals_dir) if os.path.isfile(os.path.join(wals_dir, f)) and not f.endswith('.backup')] wals.sort() for wal in wals: @@ -1610,7 +1610,7 @@ def test_validate_corrupt_wal_2(self): target_xid = res[0][0] # Corrupt WAL - wals_dir = os.path.join(backup_dir, 'wal', 'node') + wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000') wals = [f for f in os.listdir(wals_dir) if os.path.isfile(os.path.join(wals_dir, f)) and not f.endswith('.backup')] wals.sort() for wal in wals: @@ -1673,10 +1673,10 @@ def test_validate_wal_lost_segment_1(self): backup_id = self.backup_node(backup_dir, 'node', node) # Delete wal segment - wals_dir = os.path.join(backup_dir, 'wal', 'node') + wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000') wals = [f for f in os.listdir(wals_dir) if os.path.isfile(os.path.join(wals_dir, f)) and not f.endswith('.backup')] wals.sort() - file = os.path.join(backup_dir, 'wal', 'node', wals[-1]) + file = os.path.join(wals_dir, wals[-1]) os.remove(file) # cut out '.gz' @@ -1778,7 +1778,7 @@ def test_validate_corrupt_wal_between_backups(self): self.backup_node(backup_dir, 'node', node) # Corrupt WAL - wals_dir = os.path.join(backup_dir, 'wal', 'node') + wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000') with open(os.path.join(wals_dir, walfile), "rb+", 0) as f: f.seek(9000) f.write(b"b")