tor_hsservice/replay.rs
1//! Facility for detecting and preventing replays on introduction requests.
2//!
3//! If we were to permit the introduction point to replay the same request
4//! multiple times, it would cause the service to contact the rendezvous point
5//! again with the same rendezvous cookie as before, which could help with
6//! traffic analysis.
7//!
8//! (This could also be a DoS vector if the introduction point decided to
9//! overload the service.)
10//!
11//! Because we use the same introduction point keys across restarts, we need to
12//! make sure that our replay logs are already persistent. We do this by using
13//! a file on disk.
14
15mod ipt;
16#[cfg(feature = "hs-pow-full")]
17mod pow;
18
19use crate::internal_prelude::*;
20
21/// A probabilistic data structure to record fingerprints of observed Introduce2
22/// messages.
23///
24/// We need to record these fingerprints to prevent replay attacks; see the
25/// module documentation for an explanation of why that would be bad.
26///
27/// A ReplayLog should correspond to a `KP_hss_ntor` key, and should have the
28/// same lifespan: dropping it sooner will enable replays, but dropping it later
29/// will waste disk and memory.
30///
31/// False positives are allowed, to conserve on space.
32pub(crate) struct ReplayLog<T> {
33 /// The inner probabilistic data structure.
34 seen: data::Filter,
35 /// Persistent state file etc., if we're persistent
36 ///
37 /// If is is `None`, this RelayLog is ephemeral.
38 file: Option<PersistFile>,
39 /// [`PhantomData`] so rustc doesn't complain about the unused type param.
40 ///
41 /// This type represents the type of data that we're storing, as well as the type of the
42 /// key/name for that data.
43 replay_log_type: PhantomData<T>,
44}
45
46/// A [`ReplayLog`] for [`Introduce2`](tor_cell::relaycell::msg::Introduce2) messages.
47pub(crate) type IptReplayLog = ReplayLog<ipt::IptReplayLogType>;
48
49/// A [`ReplayLog`] for Proof-of-Work [`Nonce`](tor_hscrypto::pow::v1::Nonce)s.
50#[cfg(feature = "hs-pow-full")]
51pub(crate) type PowNonceReplayLog = ReplayLog<pow::PowNonceReplayLogType>;
52
53/// The length of the [`ReplayLogType::MAGIC`] constant.
54///
55// TODO: If Rust's constant expressions supported generics we wouldn't need this at all.
56const MAGIC_LEN: usize = 32;
57
58/// The length of the message that we store on disk, in bytes.
59///
60/// If the message is longer than this, then we will need to hash or truncate it before storing it
61/// to disk.
62///
63// TODO: Once const generics are good, this should be a associated constant for ReplayLogType.
64pub(crate) const OUTPUT_LEN: usize = 16;
65
66/// A trait to represent a set of types that ReplayLog can be used with.
67pub(crate) trait ReplayLogType {
68 // TODO: It would be nice to encode the directory name as a associated constant here, rather
69 // than having the external code pass it in to us.
70
71 /// The name of this item, used for the log filename.
72 type Name;
73
74 /// The type of the messages that we are ensuring the uniqueness of.
75 type Message;
76
77 /// A magic string that we put at the start of each log file, to make sure that
78 /// we don't confuse this file format with others.
79 const MAGIC: &'static [u8; MAGIC_LEN];
80
81 /// Convert [`Self::Name`] to a [`String`]
82 fn format_filename(name: &Self::Name) -> String;
83
84 /// Convert [`Self::Message`] to bytes that will be stored in the log.
85 fn transform_message(message: &Self::Message) -> [u8; OUTPUT_LEN];
86
87 /// Parse a filename into [`Self::Name`].
88 fn parse_log_leafname(leaf: &OsStr) -> Result<Self::Name, Cow<'static, str>>;
89}
90
91/// Persistent state file, and associated data
92///
93/// Stored as `ReplayLog.file`.
94#[derive(Debug)]
95pub(crate) struct PersistFile {
96 /// A file logging fingerprints of the messages we have seen.
97 file: BufWriter<File>,
98 /// Whether we had a possible partial write
99 ///
100 /// See the comment inside [`ReplayLog::check_for_replay`].
101 /// `Ok` means all is well.
102 /// `Err` means we may have written partial data to the actual file,
103 /// and need to make sure we're back at a record boundary.
104 needs_resynch: Result<(), ()>,
105 /// Filesystem lock which must not be released until after we finish writing
106 ///
107 /// Must come last so that the drop order is correct
108 #[allow(dead_code)] // Held just so we unlock on drop
109 lock: Arc<LockFileGuard>,
110}
111
112/// Replay log files have a `.bin` suffix.
113///
114/// The name of the file is determined by [`ReplayLogType::format_filename`].
115const REPLAY_LOG_SUFFIX: &str = ".bin";
116
117impl<T: ReplayLogType> ReplayLog<T> {
118 /// Create a new ReplayLog not backed by any data storage.
119 #[allow(dead_code)] // TODO #1186 Remove once something uses ReplayLog.
120 pub(crate) fn new_ephemeral() -> Self {
121 Self {
122 seen: data::Filter::new(),
123 file: None,
124 replay_log_type: PhantomData,
125 }
126 }
127
128 /// Create a ReplayLog backed by the file at a given path.
129 ///
130 /// If the file already exists, load its contents and append any new
131 /// contents to it; otherwise, create the file.
132 ///
133 /// **`lock` must already have been locked** and this
134 /// *cannot be assured by the type system*.
135 ///
136 /// # Limitations
137 ///
138 /// It is the caller's responsibility to make sure that there are never two
139 /// `ReplayLogs` open at once for the same path, or for two paths that
140 /// resolve to the same file.
141 pub(crate) fn new_logged(
142 dir: &InstanceRawSubdir,
143 name: &T::Name,
144 ) -> Result<Self, OpenReplayLogError> {
145 let leaf = T::format_filename(name);
146 let path = dir.as_path().join(leaf);
147 let lock_guard = dir.raw_lock_guard();
148
149 Self::new_logged_inner(&path, lock_guard).map_err(|error| OpenReplayLogError {
150 file: path,
151 error: error.into(),
152 })
153 }
154
155 /// Inner function for `new_logged`, with reified arguments and raw error type
156 fn new_logged_inner(path: impl AsRef<Path>, lock: Arc<LockFileGuard>) -> io::Result<Self> {
157 let mut file = {
158 let mut options = OpenOptions::new();
159 options.read(true).write(true).create(true);
160
161 #[cfg(target_family = "unix")]
162 {
163 use std::os::unix::fs::OpenOptionsExt as _;
164 options.mode(0o600);
165 }
166
167 options.open(path)?
168 };
169
170 // If the file is new, we need to write the magic string. Else we must
171 // read it.
172 let file_len = file.metadata()?.len();
173 if file_len == 0 {
174 file.write_all(T::MAGIC)?;
175 } else {
176 let mut m = [0_u8; MAGIC_LEN];
177 file.read_exact(&mut m)?;
178 if &m != T::MAGIC {
179 return Err(io::Error::new(
180 io::ErrorKind::InvalidData,
181 LogContentError::UnrecognizedFormat,
182 ));
183 }
184
185 Self::truncate_to_multiple(&mut file, file_len)?;
186 }
187
188 // Now read the rest of the file.
189 let mut seen = data::Filter::new();
190 let mut r = BufReader::new(file);
191 loop {
192 let mut msg = [0_u8; OUTPUT_LEN];
193 match r.read_exact(&mut msg) {
194 Ok(()) => {
195 let _ = seen.test_and_add(&msg); // ignore error.
196 }
197 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
198 Err(e) => return Err(e),
199 }
200 }
201 let mut file = r.into_inner();
202 file.seek(SeekFrom::End(0))?;
203
204 let file = PersistFile {
205 file: BufWriter::new(file),
206 needs_resynch: Ok(()),
207 lock,
208 };
209
210 Ok(Self {
211 seen,
212 file: Some(file),
213 replay_log_type: PhantomData,
214 })
215 }
216
217 /// Truncate `file` to contain a whole number of records
218 ///
219 /// `current_len` should have come from `file.metadata()`.
220 // If the file's length is not an even multiple of MESSAGE_LEN after the MAGIC, truncate it.
221 fn truncate_to_multiple(file: &mut File, current_len: u64) -> io::Result<()> {
222 let excess = (current_len - T::MAGIC.len() as u64) % (OUTPUT_LEN as u64);
223 if excess != 0 {
224 file.set_len(current_len - excess)?;
225 }
226 Ok(())
227 }
228
229 /// Test whether we have already seen `message`.
230 ///
231 /// If we have seen it, return `Err(ReplayError::AlreadySeen)`. (Since this
232 /// is a probabilistic data structure, there is a chance of returning this
233 /// error even if we have we have _not_ seen this particular message)
234 ///
235 /// Otherwise, return `Ok(())`.
236 pub(crate) fn check_for_replay(&mut self, message: &T::Message) -> Result<(), ReplayError> {
237 let h = T::transform_message(message);
238 self.seen.test_and_add(&h)?;
239 if let Some(f) = self.file.as_mut() {
240 (|| {
241 // If write_all fails, it might have written part of the data;
242 // in that case, we must truncate the file to resynchronise.
243 // We set a note to truncate just before we call write_all
244 // and clear it again afterwards.
245 //
246 // But, first, we need to deal with any previous note we left ourselves.
247
248 // (With the current implementation of std::io::BufWriter, this is
249 // unnecessary, because if the argument to write_all is smaller than
250 // the buffer size, BufWriter::write_all always just copies to the buffer,
251 // flushing first if necessary; and when it flushes, it uses write,
252 // not write_all. So the use of write_all never causes "lost" data.
253 // However, this is not a documented guarantee.)
254 match f.needs_resynch {
255 Ok(()) => {}
256 Err(()) => {
257 // We're going to reach behind the BufWriter, so we need to make
258 // sure it's in synch with the underlying File.
259 f.file.flush()?;
260 let inner = f.file.get_mut();
261 let len = inner.metadata()?.len();
262 Self::truncate_to_multiple(inner, len)?;
263 // cursor is now past end, must reset (see std::fs::File::set_len)
264 inner.seek(SeekFrom::End(0))?;
265 }
266 }
267 f.needs_resynch = Err(());
268
269 f.file.write_all(&h[..])?;
270
271 f.needs_resynch = Ok(());
272
273 Ok(())
274 })()
275 .map_err(|e| ReplayError::Log(Arc::new(e)))?;
276 }
277 Ok(())
278 }
279
280 /// Flush any buffered data to disk.
281 #[allow(dead_code)] // TODO #1208
282 pub(crate) fn flush(&mut self) -> Result<(), io::Error> {
283 if let Some(f) = self.file.as_mut() {
284 f.file.flush()?;
285 }
286 Ok(())
287 }
288
289 /// Tries to parse a filename in the replay logs directory
290 ///
291 /// If the leafname refers to a file that would be created by
292 /// [`ReplayLog::new_logged`], returns the name as a Rust type.
293 ///
294 /// Otherwise returns an error explaining why it isn't,
295 /// as a plain string (for logging).
296 pub(crate) fn parse_log_leafname(leaf: &OsStr) -> Result<T::Name, Cow<'static, str>> {
297 T::parse_log_leafname(leaf)
298 }
299}
300
301/// Wrapper around a fast-ish data structure for detecting replays with some
302/// false positive rate. Bloom filters, cuckoo filters, and xorf filters are all
303/// an option here. You could even use a HashSet.
304///
305/// We isolate this code to make it easier to replace.
306mod data {
307 use super::{OUTPUT_LEN, ReplayError};
308 use growable_bloom_filter::GrowableBloom;
309
310 /// A probabilistic membership filter.
311 pub(super) struct Filter(pub(crate) GrowableBloom);
312
313 impl Filter {
314 /// Create a new empty filter
315 pub(super) fn new() -> Self {
316 // TODO: Perhaps we should make the capacity here tunable, based on
317 // the number of entries we expect. These values are more or less
318 // pulled out of thin air.
319 let desired_error_prob = 1.0 / 100_000.0;
320 let est_insertions = 100_000;
321 Filter(GrowableBloom::new(desired_error_prob, est_insertions))
322 }
323
324 /// Try to add `msg` to this filter if it isn't already there.
325 ///
326 /// Return Ok(()) or Err(AlreadySeen).
327 pub(super) fn test_and_add(&mut self, msg: &[u8; OUTPUT_LEN]) -> Result<(), ReplayError> {
328 if self.0.insert(&msg[..]) {
329 Ok(())
330 } else {
331 Err(ReplayError::AlreadySeen)
332 }
333 }
334 }
335}
336
337/// A problem that prevents us from reading a ReplayLog from disk.
338///
339/// (This only exists so we can wrap it up in an [`io::Error`])
340#[derive(thiserror::Error, Clone, Debug)]
341enum LogContentError {
342 /// The magic number on the log file was incorrect.
343 #[error("unrecognized data format")]
344 UnrecognizedFormat,
345}
346
347/// An error occurred while checking whether we've seen an element before.
348#[derive(thiserror::Error, Clone, Debug)]
349pub(crate) enum ReplayError {
350 /// We have already seen this item.
351 #[error("Already seen")]
352 AlreadySeen,
353
354 /// We were unable to record this item in the log.
355 #[error("Unable to log data")]
356 Log(Arc<std::io::Error>),
357}
358
359/// Error occurred while opening replay log.
360#[derive(thiserror::Error, Clone, Debug)]
361#[error("unable to open replay log: {file:?}")]
362pub struct OpenReplayLogError {
363 /// What filesystem object we tried to do it to
364 pub(crate) file: PathBuf,
365 /// What happened
366 #[source]
367 pub(crate) error: Arc<io::Error>,
368}
369
370#[cfg(test)]
371mod test {
372 // @@ begin test lint list maintained by maint/add_warning @@
373 #![allow(clippy::bool_assert_comparison)]
374 #![allow(clippy::clone_on_copy)]
375 #![allow(clippy::dbg_macro)]
376 #![allow(clippy::mixed_attributes_style)]
377 #![allow(clippy::print_stderr)]
378 #![allow(clippy::print_stdout)]
379 #![allow(clippy::single_char_pattern)]
380 #![allow(clippy::unwrap_used)]
381 #![allow(clippy::unchecked_time_subtraction)]
382 #![allow(clippy::useless_vec)]
383 #![allow(clippy::needless_pass_by_value)]
384 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
385
386 use super::*;
387 use crate::test::mk_state_instance;
388 use rand::Rng;
389 use test_temp_dir::{TestTempDir, TestTempDirGuard, test_temp_dir};
390
391 struct TestReplayLogType;
392
393 type TestReplayLog = ReplayLog<TestReplayLogType>;
394
395 impl ReplayLogType for TestReplayLogType {
396 type Name = IptLocalId;
397 type Message = [u8; OUTPUT_LEN];
398
399 const MAGIC: &'static [u8; MAGIC_LEN] = b"<tor test replay>\n\0\0\0\0\0\0\0\0\0\0\0\0\0\0";
400
401 fn format_filename(name: &IptLocalId) -> String {
402 format!("{name}{REPLAY_LOG_SUFFIX}")
403 }
404
405 fn transform_message(message: &[u8; OUTPUT_LEN]) -> [u8; OUTPUT_LEN] {
406 message.clone()
407 }
408
409 fn parse_log_leafname(leaf: &OsStr) -> Result<IptLocalId, Cow<'static, str>> {
410 let leaf = leaf.to_str().ok_or("not proper unicode")?;
411 let lid = leaf.strip_suffix(REPLAY_LOG_SUFFIX).ok_or("not *.bin")?;
412 let lid: IptLocalId = lid
413 .parse()
414 .map_err(|e: crate::InvalidIptLocalId| e.to_string())?;
415 Ok(lid)
416 }
417 }
418
419 fn rand_msg<R: Rng>(rng: &mut R) -> [u8; OUTPUT_LEN] {
420 rng.random()
421 }
422
423 /// Basic tests on an ephemeral IptReplayLog.
424 #[test]
425 fn simple_usage() {
426 let mut rng = tor_basic_utils::test_rng::testing_rng();
427 let group_1: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
428 let group_2: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
429
430 let mut log = TestReplayLog::new_ephemeral();
431 // Add everything in group 1.
432 for msg in &group_1 {
433 assert!(log.check_for_replay(msg).is_ok(), "False positive");
434 }
435 // Make sure that everything in group 1 is still there.
436 for msg in &group_1 {
437 assert!(log.check_for_replay(msg).is_err());
438 }
439 // Make sure that group 2 is detected as not-there.
440 for msg in &group_2 {
441 assert!(log.check_for_replay(msg).is_ok(), "False positive");
442 }
443 }
444
445 const TEST_TEMP_SUBDIR: &str = "replaylog";
446
447 fn create_logged(dir: &TestTempDir) -> TestTempDirGuard<TestReplayLog> {
448 dir.subdir_used_by(TEST_TEMP_SUBDIR, |dir| {
449 let inst = mk_state_instance(&dir, "allium");
450 let raw = inst.raw_subdir("iptreplay").unwrap();
451 TestReplayLog::new_logged(&raw, &IptLocalId::dummy(1)).unwrap()
452 })
453 }
454
455 /// Basic tests on an persistent IptReplayLog.
456 #[test]
457 fn logging_basics() {
458 let mut rng = tor_basic_utils::test_rng::testing_rng();
459 let group_1: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
460 let group_2: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
461
462 let dir = test_temp_dir!();
463 let mut log = create_logged(&dir);
464 // Add everything in group 1, then close and reload.
465 for msg in &group_1 {
466 assert!(log.check_for_replay(msg).is_ok(), "False positive");
467 }
468 drop(log);
469 let mut log = create_logged(&dir);
470 // Make sure everything in group 1 is still there.
471 for msg in &group_1 {
472 assert!(log.check_for_replay(msg).is_err());
473 }
474 // Now add everything in group 2, then close and reload.
475 for msg in &group_2 {
476 assert!(log.check_for_replay(msg).is_ok(), "False positive");
477 }
478 drop(log);
479 let mut log = create_logged(&dir);
480 // Make sure that groups 1 and 2 are still there.
481 for msg in group_1.iter().chain(group_2.iter()) {
482 assert!(log.check_for_replay(msg).is_err());
483 }
484 }
485
486 /// Test for a log that gets truncated mid-write.
487 #[test]
488 fn test_truncated() {
489 let mut rng = tor_basic_utils::test_rng::testing_rng();
490 let group_1: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
491 let group_2: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
492
493 let dir = test_temp_dir!();
494 let mut log = create_logged(&dir);
495 for msg in &group_1 {
496 assert!(log.check_for_replay(msg).is_ok(), "False positive");
497 }
498 drop(log);
499 // Truncate the file by 7 bytes.
500 dir.subdir_used_by(TEST_TEMP_SUBDIR, |dir| {
501 let path = dir.join(format!("hss/allium/iptreplay/{}.bin", IptLocalId::dummy(1)));
502 let file = OpenOptions::new().write(true).open(path).unwrap();
503 // Make sure that the file has the length we expect.
504 let expected_len = MAGIC_LEN + OUTPUT_LEN * group_1.len();
505 assert_eq!(expected_len as u64, file.metadata().unwrap().len());
506 file.set_len((expected_len - 7) as u64).unwrap();
507 });
508 // Now, reload the log. We should be able to recover every non-truncated
509 // item...
510 let mut log = create_logged(&dir);
511 for msg in &group_1[..group_1.len() - 1] {
512 assert!(log.check_for_replay(msg).is_err());
513 }
514 // But not the last one, which we truncated. (Checking will add it, though.)
515 assert!(
516 log.check_for_replay(&group_1[group_1.len() - 1]).is_ok(),
517 "False positive"
518 );
519 // Now add everything in group 2, then close and reload.
520 for msg in &group_2 {
521 assert!(log.check_for_replay(msg).is_ok(), "False positive");
522 }
523 drop(log);
524 let mut log = create_logged(&dir);
525 // Make sure that groups 1 and 2 are still there.
526 for msg in group_1.iter().chain(group_2.iter()) {
527 assert!(log.check_for_replay(msg).is_err());
528 }
529 }
530
531 /// Test for a partial write
532 #[test]
533 #[cfg(target_os = "linux")] // different platforms have different definitions of sigaction
534 fn test_partial_write() {
535 use std::env;
536 use std::os::unix::process::ExitStatusExt;
537 use std::process::Command;
538
539 // TODO this contraption should perhaps be productised and put somewhere else
540
541 const ENV_NAME: &str = "TOR_HSSERVICE_TEST_PARTIAL_WRITE_SUBPROCESS";
542 // for a wait status different from any of libtest's
543 const GOOD_SIGNAL: i32 = libc::SIGUSR2;
544
545 let sigemptyset = || unsafe {
546 let mut set = MaybeUninit::uninit();
547 libc::sigemptyset(set.as_mut_ptr());
548 set.assume_init()
549 };
550
551 // Check that SIGUSR2 starts out as SIG_DFL and unblocked
552 //
553 // We *reject* such situations, rather than fixing them up, because this is an
554 // irregular and broken environment that can cause arbitrarily weird behaviours.
555 // Programs on Unix are entitled to assume that their signal dispositions are
556 // SIG_DFL on entry, with signals unblocked. (With a few exceptions.)
557 //
558 // So we want to detect and report any such environment, not let it slide.
559 unsafe {
560 let mut sa = MaybeUninit::uninit();
561 let r = libc::sigaction(GOOD_SIGNAL, ptr::null(), sa.as_mut_ptr());
562 assert_eq!(r, 0);
563 let sa = sa.assume_init();
564 assert_eq!(
565 sa.sa_sigaction,
566 libc::SIG_DFL,
567 "tests running in broken environment (SIGUSR2 not SIG_DFL)"
568 );
569
570 let empty_set = sigemptyset();
571 let mut current_set = MaybeUninit::uninit();
572 let r = libc::sigprocmask(
573 libc::SIG_UNBLOCK,
574 (&empty_set) as _,
575 current_set.as_mut_ptr(),
576 );
577 assert_eq!(r, 0);
578 let current_set = current_set.assume_init();
579 let blocked = libc::sigismember((¤t_set) as _, GOOD_SIGNAL);
580 assert_eq!(
581 blocked, 0,
582 "tests running in broken environment (SIGUSR2 blocked)"
583 );
584 }
585
586 match env::var(ENV_NAME) {
587 Err(env::VarError::NotPresent) => {
588 eprintln!("in test runner process, forking..,");
589 let output = Command::new(env::current_exe().unwrap())
590 .args(["--nocapture", "replay::test::test_partial_write"])
591 .env(ENV_NAME, "1")
592 .output()
593 .unwrap();
594 let print_output = |prefix, data| match std::str::from_utf8(data) {
595 Ok(s) => {
596 for l in s.split("\n") {
597 eprintln!(" {prefix} {l}");
598 }
599 }
600 Err(e) => eprintln!(" UTF-8 ERROR {prefix} {e}"),
601 };
602 print_output("!", &output.stdout);
603 print_output(">", &output.stderr);
604 let st = output.status;
605 eprintln!("reaped actual test process {st:?} (expecting signal {GOOD_SIGNAL})");
606 assert_eq!(st.signal(), Some(GOOD_SIGNAL));
607 return;
608 }
609 Ok(y) if y == "1" => {}
610 other => panic!("bad env var {ENV_NAME:?} {other:?}"),
611 };
612
613 // Now we are in our own process, and can mess about with ulimit etc.
614
615 use std::fs;
616 use std::mem::MaybeUninit;
617 use std::ptr;
618
619 fn set_ulimit(size: usize) {
620 unsafe {
621 use libc::RLIMIT_FSIZE;
622 let mut rlim = libc::rlimit {
623 rlim_cur: 0,
624 rlim_max: 0,
625 };
626 let r = libc::getrlimit(RLIMIT_FSIZE, (&mut rlim) as _);
627 assert_eq!(r, 0);
628 rlim.rlim_cur = size.try_into().unwrap();
629 let r = libc::setrlimit(RLIMIT_FSIZE, (&rlim) as _);
630 assert_eq!(r, 0);
631 }
632 }
633
634 // This test is quite complicated.
635 //
636 // We want to test partial writes. We could perhaps have done this by
637 // parameterising IptReplayLog so it could have something other than File,
638 // but that would probably leak into the public API.
639 //
640 // Instead, we cause *actual* partial writes. We use the Unix setrlimit
641 // call to limit the size of files our process is allowed to write.
642 // This causes the underlying write(2) calls to (i) generate SIGXFSZ
643 // (ii) if that doesn't kill the process, return partial writes.
644
645 test_temp_dir!().used_by(|dir| {
646 let path = dir.join("test.log");
647 let lock = LockFileGuard::lock(dir.join("dummy.lock")).unwrap();
648 let lock = Arc::new(lock);
649 let mut rl = TestReplayLog::new_logged_inner(&path, lock.clone()).unwrap();
650
651 const BUF: usize = 8192; // BufWriter default; if that changes, test will break
652
653 // We let ourselves write one whole buffer plus an odd amount of extra
654 const ALLOW: usize = BUF + 37;
655
656 // Ignore SIGXFSZ (default disposition is for exceeding the rlimit to kill us)
657 unsafe {
658 let sa = libc::sigaction {
659 sa_sigaction: libc::SIG_IGN,
660 sa_mask: sigemptyset(),
661 sa_flags: 0,
662 sa_restorer: None,
663 };
664 let r = libc::sigaction(libc::SIGXFSZ, (&sa) as _, ptr::null_mut());
665 assert_eq!(r, 0);
666 }
667
668 let demand_efbig = |e| match e {
669 ReplayError::Log(e) if e.kind() == io::ErrorKind::FileTooLarge => {}
670 other => panic!("expected EFBIG, got {other:?}"),
671 };
672
673 // Generate a distinct message given a phase and a counter
674 #[allow(clippy::identity_op)]
675 let mk_msg = |phase: u8, i: usize| {
676 let i = u32::try_from(i).unwrap();
677 let mut msg = [0_u8; OUTPUT_LEN];
678 msg[0] = phase;
679 msg[1] = phase;
680 msg[4] = (i >> 24) as _;
681 msg[5] = (i >> 16) as _;
682 msg[6] = (i >> 8) as _;
683 msg[7] = (i >> 0) as _;
684 msg
685 };
686
687 // Number of hashes we can write to the file before failure occurs
688 const CAN_DO: usize = (ALLOW + BUF - MAGIC_LEN) / OUTPUT_LEN;
689 dbg!(MAGIC_LEN, OUTPUT_LEN, BUF, ALLOW, CAN_DO);
690
691 // Record of the hashes that TestReplayLog tells us were OK and not replays;
692 // ie, which it therefore ought to have recorded.
693 let mut gave_ok = Vec::new();
694
695 set_ulimit(ALLOW);
696
697 for i in 0..CAN_DO {
698 let h = mk_msg(b'y', i);
699 rl.check_for_replay(&h).unwrap();
700 gave_ok.push(h);
701 }
702
703 let md = fs::metadata(&path).unwrap();
704 dbg!(md.len(), &rl.file);
705
706 // Now we have written what we can. The next two calls will fail,
707 // since the BufWriter buffer is full and can't be flushed.
708
709 for i in 0..2 {
710 eprintln!("expecting EFBIG {i}");
711 demand_efbig(rl.check_for_replay(&mk_msg(b'n', i)).unwrap_err());
712 let md = fs::metadata(&path).unwrap();
713 assert_eq!(md.len(), u64::try_from(ALLOW).unwrap());
714 }
715
716 // Enough that we don't get any further file size exceedances
717 set_ulimit(ALLOW * 10);
718
719 // Now we should be able to recover. We write two more hashes.
720 for i in 0..2 {
721 eprintln!("recovering {i}");
722 let h = mk_msg(b'r', i);
723 rl.check_for_replay(&h).unwrap();
724 gave_ok.push(h);
725 }
726
727 // flush explicitly just so we catch any error
728 // (drop would flush, but it can't report errors)
729 rl.flush().unwrap();
730 drop(rl);
731
732 // Reopen the log - reading in the written data.
733 // We can then check that everything the earlier IptReplayLog
734 // claimed to have written, is indeed recorded.
735
736 let mut rl = TestReplayLog::new_logged_inner(&path, lock.clone()).unwrap();
737 for msg in &gave_ok {
738 match rl.check_for_replay(msg) {
739 Err(ReplayError::AlreadySeen) => {}
740 other => panic!("expected AlreadySeen, got {other:?}"),
741 }
742 }
743
744 eprintln!("recovered file contents checked, all good");
745 });
746
747 unsafe {
748 libc::raise(libc::SIGUSR2);
749 }
750 panic!("we survived raise SIGUSR2");
751 }
752}