-
Notifications
You must be signed in to change notification settings - Fork 86
/
Copy pathdata.c
2701 lines (2296 loc) · 74 KB
/
data.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*-------------------------------------------------------------------------
*
* data.c: utils to parse and backup data pages
*
* Portions Copyright (c) 2009-2013, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
* Portions Copyright (c) 2015-2022, Postgres Professional
*
*-------------------------------------------------------------------------
*/
#include "pg_probackup.h"
#include "storage/checksum.h"
#include "storage/checksum_impl.h"
#include <common/pg_lzcompress.h>
#include "utils/file.h"
#include <unistd.h>
#include <sys/stat.h>
#ifdef HAVE_LIBZ
#include <zlib.h>
#endif
#include "utils/thread.h"
/* Union to ease operations on relation pages */
typedef struct DataPage
{
BackupPageHeader bph;
char data[BLCKSZ];
} DataPage;
static bool get_page_header(FILE *in, const char *fullpath, BackupPageHeader *bph,
pg_crc32 *crc, bool use_crc32c);
static BackupPageHeader2_v1*
get_data_file_headers_v1(HeaderMap *hdr_map, pgFile *file, uint32 backup_version, bool strict);
#ifdef HAVE_LIBZ
/* Implementation of zlib compression method */
static int32
zlib_compress(void *dst, size_t dst_size, void const *src, size_t src_size,
int level)
{
uLongf compressed_size = dst_size;
int rc = compress2(dst, &compressed_size, src, src_size,
level);
return rc == Z_OK ? compressed_size : rc;
}
/* Implementation of zlib compression method */
static int32
zlib_decompress(void *dst, size_t dst_size, void const *src, size_t src_size)
{
uLongf dest_len = dst_size;
int rc = uncompress(dst, &dest_len, src, src_size);
return rc == Z_OK ? dest_len : rc;
}
#endif
/*
* Compresses source into dest using algorithm. Returns the number of bytes
* written in the destination buffer, or -1 if compression fails.
*/
int32
do_compress(void *dst, size_t dst_size, void const *src, size_t src_size,
CompressAlg alg, int level, const char **errormsg)
{
switch (alg)
{
case NONE_COMPRESS:
case NOT_DEFINED_COMPRESS:
return -1;
#ifdef HAVE_LIBZ
case ZLIB_COMPRESS:
{
int32 ret;
ret = zlib_compress(dst, dst_size, src, src_size, level);
if (ret < Z_OK && errormsg)
*errormsg = zError(ret);
return ret;
}
#endif
case PGLZ_COMPRESS:
return pglz_compress(src, src_size, dst, PGLZ_strategy_always);
}
return -1;
}
/*
* Decompresses source into dest using algorithm. Returns the number of bytes
* decompressed in the destination buffer, or -1 if decompression fails.
*/
int32
do_decompress(void *dst, size_t dst_size, void const *src, size_t src_size,
CompressAlg alg, const char **errormsg)
{
switch (alg)
{
case NONE_COMPRESS:
case NOT_DEFINED_COMPRESS:
if (errormsg)
*errormsg = "Invalid compression algorithm";
return -1;
#ifdef HAVE_LIBZ
case ZLIB_COMPRESS:
{
int32 ret;
ret = zlib_decompress(dst, dst_size, src, src_size);
if (ret < Z_OK && errormsg)
*errormsg = zError(ret);
return ret;
}
#endif
case PGLZ_COMPRESS:
#if PG_VERSION_NUM >= 120000
return pglz_decompress(src, src_size, dst, dst_size, true);
#else
return pglz_decompress(src, src_size, dst, dst_size);
#endif
}
return -1;
}
#define ZLIB_MAGIC 0x78
/*
* Before version 2.0.23 there was a bug in pro_backup that pages which compressed
* size is exactly the same as original size are not treated as compressed.
* This check tries to detect and decompress such pages.
* There is no 100% criteria to determine whether page is compressed or not.
* But at least we will do this check only for pages which will no pass validation step.
*/
static bool
page_may_be_compressed(Page page, CompressAlg alg, uint32 backup_version)
{
PageHeader phdr;
phdr = (PageHeader) page;
/* First check if page header is valid (it seems to be fast enough check) */
if (!(PageGetPageSize(phdr) == BLCKSZ &&
// PageGetPageLayoutVersion(phdr) == PG_PAGE_LAYOUT_VERSION &&
(phdr->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
phdr->pd_lower >= SizeOfPageHeaderData &&
phdr->pd_lower <= phdr->pd_upper &&
phdr->pd_upper <= phdr->pd_special &&
phdr->pd_special <= BLCKSZ &&
phdr->pd_special == MAXALIGN(phdr->pd_special)))
{
/* ... end only if it is invalid, then do more checks */
if (backup_version >= 20023)
{
/* Versions 2.0.23 and higher don't have such bug */
return false;
}
#ifdef HAVE_LIBZ
/* For zlib we can check page magic:
* https://stackoverflow.com/questions/9050260/what-does-a-zlib-header-look-like
*/
if (alg == ZLIB_COMPRESS && *(char *)page != ZLIB_MAGIC)
{
return false;
}
#endif
/* otherwise let's try to decompress the page */
return true;
}
return false;
}
/* Verify page's header */
bool
parse_page(Page page, XLogRecPtr *lsn)
{
PageHeader phdr = (PageHeader) page;
/* Get lsn from page header */
*lsn = PageXLogRecPtrGet(phdr->pd_lsn);
if (PageGetPageSize(phdr) == BLCKSZ &&
// PageGetPageLayoutVersion(phdr) == PG_PAGE_LAYOUT_VERSION &&
(phdr->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
phdr->pd_lower >= SizeOfPageHeaderData &&
phdr->pd_lower <= phdr->pd_upper &&
phdr->pd_upper <= phdr->pd_special &&
phdr->pd_special <= BLCKSZ &&
phdr->pd_special == MAXALIGN(phdr->pd_special))
return true;
return false;
}
/* We know that header is invalid, store specific
* details in errormsg.
*/
void
get_header_errormsg(Page page, char **errormsg)
{
PageHeader phdr = (PageHeader) page;
*errormsg = pgut_malloc(ERRMSG_MAX_LEN);
if (PageGetPageSize(phdr) != BLCKSZ)
snprintf(*errormsg, ERRMSG_MAX_LEN, "page header invalid, "
"page size %lu is not equal to block size %u",
PageGetPageSize(phdr), BLCKSZ);
else if (phdr->pd_lower < SizeOfPageHeaderData)
snprintf(*errormsg, ERRMSG_MAX_LEN, "page header invalid, "
"pd_lower %i is less than page header size %lu",
phdr->pd_lower, SizeOfPageHeaderData);
else if (phdr->pd_lower > phdr->pd_upper)
snprintf(*errormsg, ERRMSG_MAX_LEN, "page header invalid, "
"pd_lower %u is greater than pd_upper %u",
phdr->pd_lower, phdr->pd_upper);
else if (phdr->pd_upper > phdr->pd_special)
snprintf(*errormsg, ERRMSG_MAX_LEN, "page header invalid, "
"pd_upper %u is greater than pd_special %u",
phdr->pd_upper, phdr->pd_special);
else if (phdr->pd_special > BLCKSZ)
snprintf(*errormsg, ERRMSG_MAX_LEN, "page header invalid, "
"pd_special %u is greater than block size %u",
phdr->pd_special, BLCKSZ);
else if (phdr->pd_special != MAXALIGN(phdr->pd_special))
snprintf(*errormsg, ERRMSG_MAX_LEN, "page header invalid, "
"pd_special %i is misaligned, expected %lu",
phdr->pd_special, MAXALIGN(phdr->pd_special));
else if (phdr->pd_flags & ~PD_VALID_FLAG_BITS)
snprintf(*errormsg, ERRMSG_MAX_LEN, "page header invalid, "
"pd_flags mask contain illegal bits");
else
snprintf(*errormsg, ERRMSG_MAX_LEN, "page header invalid");
}
/* We know that checksumms are mismatched, store specific
* details in errormsg.
*/
void
get_checksum_errormsg(Page page, char **errormsg, BlockNumber absolute_blkno)
{
PageHeader phdr = (PageHeader) page;
*errormsg = pgut_malloc(ERRMSG_MAX_LEN);
snprintf(*errormsg, ERRMSG_MAX_LEN,
"page verification failed, "
"calculated checksum %u but expected %u",
phdr->pd_checksum,
pg_checksum_page(page, absolute_blkno));
}
/*
* Retrieves a page taking the backup mode into account
* and writes it into argument "page". Argument "page"
* should be a pointer to allocated BLCKSZ of bytes.
*
* Prints appropriate warnings/errors/etc into log.
* Returns:
* PageIsOk(0) if page was successfully retrieved
* PageIsTruncated(-1) if the page was truncated
* SkipCurrentPage(-2) if we need to skip this page,
* only used for DELTA and PTRACK backup
* PageIsCorrupted(-3) if the page checksum mismatch
* or header corruption,
* only used for checkdb
* TODO: probably we should always
* return it to the caller
*/
static int32
prepare_page(pgFile *file, XLogRecPtr prev_backup_start_lsn,
BlockNumber blknum, FILE *in,
BackupMode backup_mode,
Page page, bool strict,
uint32 checksum_version,
const char *from_fullpath,
PageState *page_st)
{
int try_again = PAGE_READ_ATTEMPTS;
bool page_is_valid = false;
BlockNumber absolute_blknum = file->segno * RELSEG_SIZE + blknum;
int rc = 0;
/* check for interrupt */
if (interrupted || thread_interrupted)
elog(ERROR, "Interrupted during page reading");
/*
* Read the page and verify its header and checksum.
* Under high write load it's possible that we've read partly
* flushed page, so try several times before throwing an error.
*/
while (!page_is_valid && try_again--)
{
/* read the block */
int read_len = fio_pread(in, page, ((int64)blknum) * BLCKSZ);
/* The block could have been truncated. It is fine. */
if (read_len == 0)
{
elog(VERBOSE, "Cannot read block %u of \"%s\": "
"block truncated", blknum, from_fullpath);
return PageIsTruncated;
}
else if (read_len < 0)
elog(ERROR, "Cannot read block %u of \"%s\": %s",
blknum, from_fullpath, strerror(errno));
else if (read_len != BLCKSZ)
elog(WARNING, "Cannot read block %u of \"%s\": "
"read %i of %d, try again",
blknum, from_fullpath, read_len, BLCKSZ);
else
{
/* We have BLCKSZ of raw data, validate it */
rc = validate_one_page(page, absolute_blknum,
InvalidXLogRecPtr, page_st,
checksum_version);
switch (rc)
{
case PAGE_IS_ZEROED:
elog(VERBOSE, "File: \"%s\" blknum %u, empty page", from_fullpath, blknum);
return PageIsOk;
case PAGE_IS_VALID:
/* in DELTA or PTRACK modes we must compare lsn */
if (backup_mode == BACKUP_MODE_DIFF_DELTA || backup_mode == BACKUP_MODE_DIFF_PTRACK)
page_is_valid = true;
else
return PageIsOk;
break;
case PAGE_HEADER_IS_INVALID:
elog(VERBOSE, "File: \"%s\" blknum %u have wrong page header, try again",
from_fullpath, blknum);
break;
case PAGE_CHECKSUM_MISMATCH:
elog(VERBOSE, "File: \"%s\" blknum %u have wrong checksum, try again",
from_fullpath, blknum);
break;
default:
Assert(false);
}
}
/* avoid re-reading once buffered data, flushing on further attempts, see PBCKP-150 */
fflush(in);
}
/*
* If page is not valid after PAGE_READ_ATTEMPTS attempts to read it
* throw an error.
*/
if (!page_is_valid)
{
int elevel = ERROR;
char *errormsg = NULL;
/* Get the details of corruption */
if (rc == PAGE_HEADER_IS_INVALID)
get_header_errormsg(page, &errormsg);
else if (rc == PAGE_CHECKSUM_MISMATCH)
get_checksum_errormsg(page, &errormsg,
file->segno * RELSEG_SIZE + blknum);
/* Error out in case of merge or backup without ptrack support;
* issue warning in case of checkdb or backup with ptrack support
*/
if (!strict)
elevel = WARNING;
if (errormsg)
elog(elevel, "Corruption detected in file \"%s\", block %u: %s",
from_fullpath, blknum, errormsg);
else
elog(elevel, "Corruption detected in file \"%s\", block %u",
from_fullpath, blknum);
pg_free(errormsg);
return PageIsCorrupted;
}
/* Checkdb not going futher */
if (!strict)
return PageIsOk;
/*
* Skip page if page lsn is less than START_LSN of parent backup.
* Nullified pages must be copied by DELTA backup, just to be safe.
*/
if ((backup_mode == BACKUP_MODE_DIFF_DELTA || backup_mode == BACKUP_MODE_DIFF_PTRACK) &&
file->exists_in_prev &&
page_st->lsn > 0 &&
page_st->lsn < prev_backup_start_lsn)
{
elog(VERBOSE, "Skipping blknum %u in file: \"%s\", file->exists_in_prev: %s, page_st->lsn: %X/%X, prev_backup_start_lsn: %X/%X",
blknum, from_fullpath,
file->exists_in_prev ? "true" : "false",
(uint32) (page_st->lsn >> 32), (uint32) page_st->lsn,
(uint32) (prev_backup_start_lsn >> 32), (uint32) prev_backup_start_lsn);
return SkipCurrentPage;
}
return PageIsOk;
}
/* split this function in two: compress() and backup() */
static int
compress_and_backup_page(pgFile *file, BlockNumber blknum,
FILE *in, FILE *out, pg_crc32 *crc,
int page_state, Page page,
CompressAlg calg, int clevel,
const char *from_fullpath, const char *to_fullpath)
{
int compressed_size = 0;
size_t write_buffer_size = 0;
char write_buffer[BLCKSZ*2]; /* compressed page may require more space than uncompressed */
BackupPageHeader* bph = (BackupPageHeader*)write_buffer;
const char *errormsg = NULL;
/* Compress the page */
compressed_size = do_compress(write_buffer + sizeof(BackupPageHeader),
sizeof(write_buffer) - sizeof(BackupPageHeader),
page, BLCKSZ, calg, clevel,
&errormsg);
/* Something went wrong and errormsg was assigned, throw a warning */
if (compressed_size < 0 && errormsg != NULL)
elog(WARNING, "An error occured during compressing block %u of file \"%s\": %s",
blknum, from_fullpath, errormsg);
file->compress_alg = calg; /* TODO: wtf? why here? */
/* compression didn`t worked */
if (compressed_size <= 0 || compressed_size >= BLCKSZ)
{
/* Do not compress page */
memcpy(write_buffer + sizeof(BackupPageHeader), page, BLCKSZ);
compressed_size = BLCKSZ;
}
bph->block = blknum;
bph->compressed_size = compressed_size;
write_buffer_size = compressed_size + sizeof(BackupPageHeader);
/* Update CRC */
COMP_FILE_CRC32(true, *crc, write_buffer, write_buffer_size);
/* write data page */
if (fio_fwrite(out, write_buffer, write_buffer_size) != write_buffer_size)
elog(ERROR, "File: \"%s\", cannot write at block %u: %s",
to_fullpath, blknum, strerror(errno));
file->write_size += write_buffer_size;
file->uncompressed_size += BLCKSZ;
return compressed_size;
}
/* Write page as-is. TODO: make it fastpath option in compress_and_backup_page() */
static int
write_page(pgFile *file, FILE *out, Page page)
{
/* write data page */
if (fio_fwrite(out, page, BLCKSZ) != BLCKSZ)
return -1;
file->write_size += BLCKSZ;
file->uncompressed_size += BLCKSZ;
return BLCKSZ;
}
/*
* Backup data file in the from_root directory to the to_root directory with
* same relative path. If prev_backup_start_lsn is not NULL, only pages with
* higher lsn will be copied.
* Not just copy file, but read it block by block (use bitmap in case of
* incremental backup), validate checksum, optionally compress and write to
* backup with special header.
*/
void
backup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpath,
XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode,
CompressAlg calg, int clevel, uint32 checksum_version,
HeaderMap *hdr_map, bool is_merge)
{
int64 rc;
bool use_pagemap;
char *errmsg = NULL;
BlockNumber err_blknum = 0;
/* page headers */
BackupPageHeader2 *headers = NULL;
/* sanity */
if (file->size % BLCKSZ != 0)
elog(WARNING, "File: \"%s\", invalid file size %zu", from_fullpath, file->size);
/*
* Compute expected number of blocks in the file.
* NOTE This is a normal situation, if the file size has changed
* since the moment we computed it.
*/
file->n_blocks = file->size/BLCKSZ;
/*
* Skip unchanged file only if it exists in previous backup.
* This way we can correctly handle null-sized files which are
* not tracked by pagemap and thus always marked as unchanged.
*/
if ((backup_mode == BACKUP_MODE_DIFF_PAGE ||
backup_mode == BACKUP_MODE_DIFF_PTRACK) &&
file->pagemap.bitmapsize == PageBitmapIsEmpty &&
file->exists_in_prev && !file->pagemap_isabsent)
{
/*
* There are no changed blocks since last backup. We want to make
* incremental backup, so we should exit.
*/
file->write_size = BYTES_INVALID;
return;
}
/* reset size summary */
file->read_size = 0;
file->write_size = 0;
file->uncompressed_size = 0;
INIT_FILE_CRC32(true, file->crc);
/*
* Read each page, verify checksum and write it to backup.
* If page map is empty or file is not present in previous backup
* backup all pages of the relation.
*
* In PTRACK 1.x there was a problem
* of data files with missing _ptrack map.
* Such files should be fully copied.
*/
if (file->pagemap.bitmapsize == PageBitmapIsEmpty ||
file->pagemap_isabsent || !file->exists_in_prev ||
!file->pagemap.bitmap)
use_pagemap = false;
else
use_pagemap = true;
/* Remote mode */
if (fio_is_remote(FIO_DB_HOST))
{
rc = fio_send_pages(to_fullpath, from_fullpath, file,
/* send prev backup START_LSN */
(backup_mode == BACKUP_MODE_DIFF_DELTA || backup_mode == BACKUP_MODE_DIFF_PTRACK) &&
file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr,
calg, clevel, checksum_version,
/* send pagemap if any */
use_pagemap,
/* variables for error reporting */
&err_blknum, &errmsg, &headers);
}
else
{
/* TODO: stop handling errors internally */
rc = send_pages(to_fullpath, from_fullpath, file,
/* send prev backup START_LSN */
(backup_mode == BACKUP_MODE_DIFF_DELTA || backup_mode == BACKUP_MODE_DIFF_PTRACK) &&
file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr,
calg, clevel, checksum_version, use_pagemap,
&headers, backup_mode);
}
/* check for errors */
if (rc == FILE_MISSING)
{
elog(is_merge ? ERROR : LOG, "File not found: \"%s\"", from_fullpath);
file->write_size = FILE_NOT_FOUND;
goto cleanup;
}
else if (rc == WRITE_FAILED)
elog(ERROR, "Cannot write block %u of \"%s\": %s",
err_blknum, to_fullpath, strerror(errno));
else if (rc == PAGE_CORRUPTION)
{
if (errmsg)
elog(ERROR, "Corruption detected in file \"%s\", block %u: %s",
from_fullpath, err_blknum, errmsg);
else
elog(ERROR, "Corruption detected in file \"%s\", block %u",
from_fullpath, err_blknum);
}
/* OPEN_FAILED and READ_FAILED */
else if (rc == OPEN_FAILED)
{
if (errmsg)
elog(ERROR, "%s", errmsg);
else
elog(ERROR, "Cannot open file \"%s\"", from_fullpath);
}
else if (rc == READ_FAILED)
{
if (errmsg)
elog(ERROR, "%s", errmsg);
else
elog(ERROR, "Cannot read file \"%s\"", from_fullpath);
}
file->read_size = rc * BLCKSZ;
/* refresh n_blocks for FULL and DELTA */
if (backup_mode == BACKUP_MODE_FULL ||
backup_mode == BACKUP_MODE_DIFF_DELTA)
file->n_blocks = file->read_size / BLCKSZ;
/* Determine that file didn`t changed in case of incremental backup */
if (backup_mode != BACKUP_MODE_FULL &&
file->exists_in_prev &&
file->write_size == 0 &&
file->n_blocks > 0)
{
file->write_size = BYTES_INVALID;
}
cleanup:
/* finish CRC calculation */
FIN_FILE_CRC32(true, file->crc);
/* dump page headers */
write_page_headers(headers, file, hdr_map, is_merge);
pg_free(errmsg);
pg_free(file->pagemap.bitmap);
pg_free(headers);
}
/*
* Catchup data file in the from_root directory to the to_root directory with
* same relative path. If sync_lsn is not NULL, only pages with equal or
* higher lsn will be copied.
* Not just copy file, but read it block by block (use bitmap in case of
* incremental catchup), validate page checksum.
*/
void
catchup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpath,
XLogRecPtr sync_lsn, BackupMode backup_mode,
uint32 checksum_version, size_t prev_size)
{
int rc;
bool use_pagemap;
char *errmsg = NULL;
BlockNumber err_blknum = 0;
/*
* Compute expected number of blocks in the file.
* NOTE This is a normal situation, if the file size has changed
* since the moment we computed it.
*/
file->n_blocks = file->size/BLCKSZ;
/*
* Skip unchanged file only if it exists in destination directory.
* This way we can correctly handle null-sized files which are
* not tracked by pagemap and thus always marked as unchanged.
*/
if (backup_mode == BACKUP_MODE_DIFF_PTRACK &&
file->pagemap.bitmapsize == PageBitmapIsEmpty &&
file->exists_in_prev && file->size == prev_size && !file->pagemap_isabsent)
{
/*
* There are none changed pages.
*/
file->write_size = BYTES_INVALID;
return;
}
/* reset size summary */
file->read_size = 0;
file->write_size = 0;
file->uncompressed_size = 0;
/*
* If page map is empty or file is not present in destination directory,
* then copy backup all pages of the relation.
*/
if (file->pagemap.bitmapsize == PageBitmapIsEmpty ||
file->pagemap_isabsent || !file->exists_in_prev ||
!file->pagemap.bitmap)
use_pagemap = false;
else
use_pagemap = true;
if (use_pagemap)
elog(LOG, "Using pagemap for file \"%s\"", file->rel_path);
/* Remote mode */
if (fio_is_remote(FIO_DB_HOST))
{
rc = fio_copy_pages(to_fullpath, from_fullpath, file,
/* send prev backup START_LSN */
((backup_mode == BACKUP_MODE_DIFF_DELTA || backup_mode == BACKUP_MODE_DIFF_PTRACK) &&
file->exists_in_prev) ? sync_lsn : InvalidXLogRecPtr,
NONE_COMPRESS, 1, checksum_version,
/* send pagemap if any */
use_pagemap,
/* variables for error reporting */
&err_blknum, &errmsg);
}
else
{
/* TODO: stop handling errors internally */
rc = copy_pages(to_fullpath, from_fullpath, file,
/* send prev backup START_LSN */
((backup_mode == BACKUP_MODE_DIFF_DELTA || backup_mode == BACKUP_MODE_DIFF_PTRACK) &&
file->exists_in_prev) ? sync_lsn : InvalidXLogRecPtr,
checksum_version, use_pagemap, backup_mode);
}
/* check for errors */
if (rc == FILE_MISSING)
{
elog(LOG, "File not found: \"%s\"", from_fullpath);
file->write_size = FILE_NOT_FOUND;
goto cleanup;
}
else if (rc == WRITE_FAILED)
elog(ERROR, "Cannot write block %u of \"%s\": %s",
err_blknum, to_fullpath, strerror(errno));
else if (rc == PAGE_CORRUPTION)
{
if (errmsg)
elog(ERROR, "Corruption detected in file \"%s\", block %u: %s",
from_fullpath, err_blknum, errmsg);
else
elog(ERROR, "Corruption detected in file \"%s\", block %u",
from_fullpath, err_blknum);
}
/* OPEN_FAILED and READ_FAILED */
else if (rc == OPEN_FAILED)
{
if (errmsg)
elog(ERROR, "%s", errmsg);
else
elog(ERROR, "Cannot open file \"%s\"", from_fullpath);
}
else if (rc == READ_FAILED)
{
if (errmsg)
elog(ERROR, "%s", errmsg);
else
elog(ERROR, "Cannot read file \"%s\"", from_fullpath);
}
file->read_size = ((int64)rc) * BLCKSZ;
/* Determine that file didn`t changed in case of incremental catchup */
if (backup_mode != BACKUP_MODE_FULL &&
file->exists_in_prev &&
file->write_size == 0 &&
file->n_blocks > 0)
{
file->write_size = BYTES_INVALID;
}
cleanup:
pg_free(errmsg);
pg_free(file->pagemap.bitmap);
}
/*
* Backup non data file
* We do not apply compression to this file.
* If file exists in previous backup, then compare checksums
* and make a decision about copying or skiping the file.
*/
void
backup_non_data_file(pgFile *file, pgFile *prev_file,
const char *from_fullpath, const char *to_fullpath,
BackupMode backup_mode, time_t parent_backup_time,
bool missing_ok)
{
/* special treatment for global/pg_control */
if (file->external_dir_num == 0 && strcmp(file->rel_path, XLOG_CONTROL_FILE) == 0)
{
copy_pgcontrol_file(from_fullpath, FIO_DB_HOST,
to_fullpath, FIO_BACKUP_HOST, file);
return;
}
/*
* If non-data file exists in previous backup
* and its mtime is less than parent backup start time ... */
if ((pg_strcasecmp(file->name, RELMAPPER_FILENAME) != 0) &&
(prev_file && file->exists_in_prev &&
file->mtime <= parent_backup_time))
{
/*
* file could be deleted under our feets.
* But then backup_non_data_file_internal will handle it safely
*/
file->crc = fio_get_crc32(from_fullpath, FIO_DB_HOST, false, true);
/* ...and checksum is the same... */
if (EQ_TRADITIONAL_CRC32(file->crc, prev_file->crc))
{
file->write_size = BYTES_INVALID;
return; /* ...skip copying file. */
}
}
backup_non_data_file_internal(from_fullpath, FIO_DB_HOST,
to_fullpath, file, missing_ok);
}
/*
* Iterate over parent backup chain and lookup given destination file in
* filelist of every chain member starting with FULL backup.
* Apply changed blocks to destination file from every backup in parent chain.
*/
size_t
restore_data_file(parray *parent_chain, pgFile *dest_file, FILE *out,
const char *to_fullpath, bool use_bitmap, PageState *checksum_map,
XLogRecPtr shift_lsn, datapagemap_t *lsn_map, bool use_headers)
{
size_t total_write_len = 0;
char *in_buf = pgut_malloc(STDIO_BUFSIZE);
int backup_seq = 0;
/*
* FULL -> INCR -> DEST
* 2 1 0
* Restore of backups of older versions cannot be optimized with bitmap
* because of n_blocks
*/
if (use_bitmap)
/* start with dest backup */
backup_seq = 0;
else
/* start with full backup */
backup_seq = parray_num(parent_chain) - 1;
// for (i = parray_num(parent_chain) - 1; i >= 0; i--)
// for (i = 0; i < parray_num(parent_chain); i++)
while (backup_seq >= 0 && backup_seq < parray_num(parent_chain))
{
char from_root[MAXPGPATH];
char from_fullpath[MAXPGPATH];
FILE *in = NULL;
pgFile **res_file = NULL;
pgFile *tmp_file = NULL;
/* page headers */
BackupPageHeader2 *headers = NULL;
pgBackup *backup = (pgBackup *) parray_get(parent_chain, backup_seq);
if (use_bitmap)
backup_seq++;
else
backup_seq--;
/* lookup file in intermediate backup */
res_file = parray_bsearch(backup->files, dest_file, pgFileCompareRelPathWithExternal);
tmp_file = (res_file) ? *res_file : NULL;
/* Destination file is not exists yet at this moment */
if (tmp_file == NULL)
continue;
/*
* Skip file if it haven't changed since previous backup
* and thus was not backed up.
*/
if (tmp_file->write_size == BYTES_INVALID)
continue;
/* If file was truncated in intermediate backup,
* it is ok not to truncate it now, because old blocks will be
* overwritten by new blocks from next backup.
*/
if (tmp_file->write_size == 0)
continue;
/*
* At this point we are sure, that something is going to be copied
* Open source file.
*/
join_path_components(from_root, backup->root_dir, DATABASE_DIR);
join_path_components(from_fullpath, from_root, tmp_file->rel_path);
in = fopen(from_fullpath, PG_BINARY_R);
if (in == NULL)
elog(ERROR, "Cannot open backup file \"%s\": %s", from_fullpath,
strerror(errno));
/* set stdio buffering for input data file */
setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE);
/* get headers for this file */
if (use_headers && tmp_file->n_headers > 0)
headers = get_data_file_headers(&(backup->hdr_map), tmp_file,
parse_program_version(backup->program_version),
true,
backup->large_file);
if (use_headers && !headers && tmp_file->n_headers > 0)
elog(ERROR, "Failed to get page headers for file \"%s\"", from_fullpath);
/*
* Restore the file.
* Datafiles are backed up block by block and every block
* have BackupPageHeader with meta information, so we cannot just
* copy the file from backup.
*/
total_write_len += restore_data_file_internal(in, out, tmp_file,
parse_program_version(backup->program_version),
from_fullpath, to_fullpath, dest_file->n_blocks,
use_bitmap ? &(dest_file)->pagemap : NULL,
checksum_map, backup->checksum_version,
/* shiftmap can be used only if backup state precedes the shift */
backup->stop_lsn <= shift_lsn ? lsn_map : NULL,
headers);
if (fclose(in) != 0)
elog(ERROR, "Cannot close file \"%s\": %s", from_fullpath,
strerror(errno));
pg_free(headers);
// datapagemap_print_debug(&(dest_file)->pagemap);
}
pg_free(in_buf);
return total_write_len;
}
/* Restore block from "in" file to "out" file.
* If "nblocks" is greater than zero, then skip restoring blocks,
* whose position if greater than "nblocks".
* If map is NULL, then page bitmap cannot be used for restore optimization
* Page bitmap optimize restore of incremental chains, consisting of more than one
* backup. We restoring from newest to oldest and page, once restored, marked in map.
* When the same page, but in older backup, encountered, we check the map, if it is
* marked as already restored, then page is skipped.
*/
size_t
restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_version,
const char *from_fullpath, const char *to_fullpath, int64 nblocks,
datapagemap_t *map, PageState *checksum_map, int checksum_version,
datapagemap_t *lsn_map, BackupPageHeader2 *headers)
{
BlockNumber blknum = 0;
int n_hdr = -1;
size_t write_len = 0;
off_t cur_pos_out = 0;
off_t cur_pos_in = 0;
/* should not be possible */
Assert(!(backup_version >= 20400 && file->n_headers <= 0));
/*
* We rely on stdio buffering of input and output.
* For buffering to be efficient, we try to minimize the
* number of lseek syscalls, because it forces buffer flush.
* For that, we track current write position in
* output file and issue fseek only when offset of block to be
* written not equal to current write position, which happens
* a lot when blocks from incremental backup are restored,
* but should never happen in case of blocks from FULL backup.
*/
if (fio_fseek(out, cur_pos_out) < 0)
elog(ERROR, "Cannot seek block %u of \"%s\": %s",
blknum, to_fullpath, strerror(errno));
for (;;)
{
off_t write_pos;
size_t len;
size_t read_len;
DataPage page;
int32 compressed_size = 0;
bool is_compressed = false;
/* incremental restore vars */
uint16 page_crc = 0;
XLogRecPtr page_lsn = InvalidXLogRecPtr;
/* check for interrupt */
if (interrupted || thread_interrupted)