Skip to main content

tor_dirmgr/storage/
sqlite.rs

1//! Net document storage backed by sqlite3.
2//!
3//! We store most objects in sqlite tables, except for very large ones,
4//! which we store as "blob" files in a separate directory.
5
6use super::ExpirationConfig;
7use crate::docmeta::{AuthCertMeta, ConsensusMeta};
8use crate::err::ReadOnlyStorageError;
9use crate::storage::{InputString, Store};
10use crate::{Error, Result};
11
12use fs_mistrust::CheckedDir;
13use tor_basic_utils::PathExt as _;
14use tor_error::{into_internal, warn_report};
15use tor_netdoc::doc::authcert::AuthCertKeyIds;
16use tor_netdoc::doc::microdesc::MdDigest;
17use tor_netdoc::doc::netstatus::{ConsensusFlavor, Lifetime};
18#[cfg(feature = "routerdesc")]
19use tor_netdoc::doc::routerdesc::RdDigest;
20use web_time_compat::SystemTimeExt;
21
22#[cfg(feature = "bridge-client")]
23pub(crate) use {crate::storage::CachedBridgeDescriptor, tor_guardmgr::bridge::BridgeConfig};
24
25use std::collections::{HashMap, HashSet};
26use std::fs::OpenOptions;
27use std::path::{Path, PathBuf};
28use std::result::Result as StdResult;
29use std::sync::Arc;
30use std::time::SystemTime;
31
32use rusqlite::{OpenFlags, OptionalExtension, Transaction, params};
33use time::OffsetDateTime;
34use tracing::{trace, warn};
35
36#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
37use fslock::LockFile;
38
39/// Local directory cache using a Sqlite3 connection.
40pub(crate) struct SqliteStore {
41    /// Connection to the sqlite3 database.
42    conn: rusqlite::Connection,
43    /// Location for the sqlite3 database; used to reopen it.
44    sql_path: Option<PathBuf>,
45    /// Location to store blob files.
46    blob_dir: CheckedDir,
47    /// Lockfile to prevent concurrent write attempts from different
48    /// processes.
49    ///
50    /// If this is None we aren't using a lockfile.  Watch out!
51    ///
52    /// (sqlite supports that with connection locking, but we want to
53    /// be a little more coarse-grained here)
54    lockfile: Option<LockFile>,
55}
56
57/// Wasm-only: a non-implementation of LockFile.
58///
59/// TODO #2106 -- remove this when we migrate to use File::lock.
60#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
61struct LockFile(void::Void);
62
63#[allow(clippy::missing_docs_in_private_items)]
64#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
65impl LockFile {
66    fn open<P: AsRef<Path>>(_path: P) -> std::io::Result<Self> {
67        Err(std::io::Error::from(std::io::ErrorKind::Unsupported))
68    }
69
70    fn try_lock(&mut self) -> std::io::Result<bool> {
71        void::unreachable(self.0)
72    }
73
74    fn unlock(&mut self) -> std::io::Result<()> {
75        void::unreachable(self.0)
76    }
77
78    fn owns_lock(&self) -> bool {
79        void::unreachable(self.0)
80    }
81}
82
83/// # Some notes on blob consistency, and the lack thereof.
84///
85/// We store large documents (currently, consensuses) in separate files,
86/// called "blobs",
87/// outside of the sqlite database.
88/// We do this for performance reasons: for large objects,
89/// mmap is far more efficient than sqlite in RAM and CPU.
90///
91/// In the sqlite database, we keep track of our blobs
92/// using the ExtDocs table.
93/// This scheme makes it possible for the blobs and the table
94/// get out of sync.
95///
96/// In summary:
97///   - _Vanished_ blobs (ones present only in ExtDocs) are possible;
98///     we try to tolerate them.
99///   - _Orphaned_ blobs (ones present only on the disk) are possible;
100///     we try to tolerate them.
101///   - _Corrupted_ blobs (ones with the wrong contents) are possible
102///     but (we hope) unlikely;
103///     we do not currently try to tolerate them.
104///
105/// In more detail:
106///
107/// Here are the practices we use when _writing_ blobs:
108///
109/// - We always create a blob before updating the ExtDocs table,
110///   and remove an entry from the ExtDocs before deleting the blob.
111/// - If we decide to roll back the transaction that adds the row to ExtDocs,
112///   we delete the blob after doing so.
113/// - We use [`CheckedDir::write_and_replace`] to store blobs,
114///   so a half-formed blob shouldn't be common.
115///   (We assume that "close" and "rename" are serialized by the OS,
116///   so that _if_ the rename happens, the file is completely written.)
117/// - Blob filenames include a digest of the file contents,
118///   so collisions are unlikely.
119///
120/// Here are the practices we use when _deleting_ blobs:
121/// - First, we drop the row from the ExtDocs table.
122///   Only then do we delete the file.
123///
124/// These practices can result in _orphaned_ blobs
125/// (ones with no row in the ExtDoc table),
126/// or in _half-written_ blobs files with tempfile names
127/// (which also have no row in the ExtDoc table).
128/// This happens if we crash at the wrong moment.
129/// Such blobs can be safely removed;
130/// we do so in [`SqliteStore::remove_unreferenced_blobs`].
131///
132/// Despite our efforts, _vanished_ blobs
133/// (entries in the ExtDoc table with no corresponding file)
134/// are also possible.  They could happen for these reasons:
135/// - The filesystem might not serialize or sync things in a way that's
136///   consistent with the DB.
137/// - An automatic process might remove random cache files.
138/// - The user might run around deleting things to free space.
139///
140/// We try to tolerate vanished blobs.
141///
142/// _Corrupted_ blobs are also possible.  They can happen on FS corruption,
143/// or on somebody messing around with the cache directory manually.
144/// We do not attempt to tolerate corrupted blobs.
145///
146/// ## On trade-offs
147///
148/// TODO: The practices described above are more likely
149/// to create _orphaned_ blobs than _vanished_ blobs.
150/// We initially made this trade-off decision on the mistaken theory
151/// that we could avoid vanished blobs entirely.
152/// We _may_ want to revisit this choice,
153/// on the rationale that we can respond to vanished blobs as soon as we notice they're gone,
154/// whereas we can only handle orphaned blobs with a periodic cleanup.
155/// On the other hand, since we need to handle both cases,
156/// it may not matter very much in practice.
157#[allow(unused)]
158mod blob_consistency {}
159
160/// Specific error returned when a blob will not be read.
161///
162/// This error is an internal type: it's never returned to the user.
163#[derive(Debug)]
164enum AbsentBlob {
165    /// We did not find a blob file on the disk.
166    VanishedFile,
167    /// We did not even find a blob to read in ExtDocs.
168    NothingToRead,
169}
170
171impl SqliteStore {
172    /// Construct or open a new SqliteStore at some location on disk.
173    /// The provided location must be a directory, or a possible
174    /// location for a directory: the directory will be created if
175    /// necessary.
176    ///
177    /// If readonly is true, the result will be a read-only store.
178    /// Otherwise, when readonly is false, the result may be
179    /// read-only or read-write, depending on whether we can acquire
180    /// the lock.
181    ///
182    /// # Limitations:
183    ///
184    /// The file locking that we use to ensure that only one dirmgr is
185    /// writing to a given storage directory at a time is currently
186    /// _per process_. Therefore, you might get unexpected results if
187    /// two SqliteStores are created in the same process with the
188    /// path.
189    pub(crate) fn from_path_and_mistrust<P: AsRef<Path>>(
190        path: P,
191        mistrust: &fs_mistrust::Mistrust,
192        mut readonly: bool,
193    ) -> Result<Self> {
194        let path = path.as_ref();
195        let sqlpath = path.join("dir.sqlite3");
196        let blobpath = path.join("dir_blobs/");
197        let lockpath = path.join("dir.lock");
198
199        let verifier = mistrust.verifier().permit_readable().check_content();
200
201        let blob_dir = if readonly {
202            verifier.secure_dir(blobpath)?
203        } else {
204            verifier.make_secure_dir(blobpath)?
205        };
206
207        // Check permissions on the sqlite and lock files; don't require them to
208        // exist.
209        for p in [&lockpath, &sqlpath] {
210            match mistrust
211                .verifier()
212                .permit_readable()
213                .require_file()
214                .check(p)
215            {
216                Ok(()) | Err(fs_mistrust::Error::NotFound(_)) => {}
217                Err(e) => return Err(e.into()),
218            }
219        }
220
221        let mut lockfile = LockFile::open(&lockpath).map_err(Error::from_lockfile)?;
222        if !readonly && !lockfile.try_lock().map_err(Error::from_lockfile)? {
223            readonly = true; // we couldn't get the lock!
224        };
225        let flags = if readonly {
226            OpenFlags::SQLITE_OPEN_READ_ONLY
227        } else {
228            OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE
229        };
230        let conn = rusqlite::Connection::open_with_flags(&sqlpath, flags)?;
231        let mut store = SqliteStore::from_conn_internal(conn, blob_dir, readonly)?;
232        store.sql_path = Some(sqlpath);
233        store.lockfile = Some(lockfile);
234        Ok(store)
235    }
236
237    /// Construct a new SqliteStore from a database connection and a location
238    /// for blob files.
239    ///
240    /// Used for testing with a memory-backed database.
241    ///
242    /// Note: `blob_dir` must not be used for anything other than storing the blobs associated with
243    /// this database, since we will freely remove unreferenced files from this directory.
244    #[cfg(test)]
245    fn from_conn(conn: rusqlite::Connection, blob_dir: CheckedDir) -> Result<Self> {
246        Self::from_conn_internal(conn, blob_dir, false)
247    }
248
249    /// Construct a new SqliteStore from a database connection and a location
250    /// for blob files.
251    ///
252    /// The `readonly` argument specifies whether the database connection should be read-only.
253    fn from_conn_internal(
254        conn: rusqlite::Connection,
255        blob_dir: CheckedDir,
256        readonly: bool,
257    ) -> Result<Self> {
258        // sqlite (as of Jun 2024) does not enforce foreign keys automatically unless you set this
259        // pragma on the connection.
260        conn.pragma_update(None, "foreign_keys", "ON")?;
261
262        let mut result = SqliteStore {
263            conn,
264            blob_dir,
265            lockfile: None,
266            sql_path: None,
267        };
268
269        result.check_schema(readonly)?;
270
271        Ok(result)
272    }
273
274    /// Check whether this database has a schema format we can read, and
275    /// install or upgrade the schema if necessary.
276    fn check_schema(&mut self, readonly: bool) -> Result<()> {
277        let tx = self.conn.transaction()?;
278        let db_n_tables: u32 = tx.query_row(
279            "SELECT COUNT(name) FROM sqlite_master
280             WHERE type='table'
281             AND name NOT LIKE 'sqlite_%'",
282            [],
283            |row| row.get(0),
284        )?;
285        let db_exists = db_n_tables > 0;
286
287        // Update the schema from current_vsn to the latest (does not commit)
288        let update_schema = |tx: &rusqlite::Transaction, current_vsn| {
289            for (from_vsn, update) in UPDATE_SCHEMA.iter().enumerate() {
290                let from_vsn = u32::try_from(from_vsn).expect("schema version >2^32");
291                let new_vsn = from_vsn + 1;
292                if current_vsn < new_vsn {
293                    tx.execute_batch(update)?;
294                    tx.execute(UPDATE_SCHEMA_VERSION, params![new_vsn, new_vsn])?;
295                }
296            }
297            Ok::<_, Error>(())
298        };
299
300        if !db_exists {
301            if !readonly {
302                tx.execute_batch(INSTALL_V0_SCHEMA)?;
303                update_schema(&tx, 0)?;
304                tx.commit()?;
305            } else {
306                // The other process should have created the database!
307                return Err(Error::ReadOnlyStorage(ReadOnlyStorageError::NoDatabase));
308            }
309            return Ok(());
310        }
311
312        let (version, readable_by): (u32, u32) = tx.query_row(
313            "SELECT version, readable_by FROM TorSchemaMeta
314             WHERE name = 'TorDirStorage'",
315            [],
316            |row| Ok((row.get(0)?, row.get(1)?)),
317        )?;
318
319        if version < SCHEMA_VERSION {
320            if !readonly {
321                update_schema(&tx, version)?;
322                tx.commit()?;
323            } else {
324                return Err(Error::ReadOnlyStorage(
325                    ReadOnlyStorageError::IncompatibleSchema {
326                        schema: version,
327                        supported: SCHEMA_VERSION,
328                    },
329                ));
330            }
331
332            return Ok(());
333        } else if readable_by > SCHEMA_VERSION {
334            return Err(Error::UnrecognizedSchema {
335                schema: readable_by,
336                supported: SCHEMA_VERSION,
337            });
338        }
339
340        // rolls back the transaction, but nothing was done.
341        Ok(())
342    }
343
344    /// Read a blob from disk, mapping it if possible.
345    ///
346    /// Return `Ok(Err(.))` if the file for the blob was not found on disk;
347    /// returns an error in other cases.
348    ///
349    /// (See [`blob_consistency`] for information on why the blob might be absent.)
350    fn read_blob(&self, path: &str) -> Result<StdResult<InputString, AbsentBlob>> {
351        let file = match self.blob_dir.open(path, OpenOptions::new().read(true)) {
352            Ok(file) => file,
353            Err(fs_mistrust::Error::NotFound(_)) => {
354                warn!(
355                    "{:?} was listed in the database, but its corresponding file had been deleted",
356                    path
357                );
358                return Ok(Err(AbsentBlob::VanishedFile));
359            }
360            Err(e) => return Err(e.into()),
361        };
362
363        InputString::load(file)
364            .map_err(|err| Error::CacheFile {
365                action: "loading",
366                fname: PathBuf::from(path),
367                error: Arc::new(err),
368            })
369            .map(Ok)
370    }
371
372    /// Write a file to disk as a blob, and record it in the ExtDocs table.
373    ///
374    /// Return a SavedBlobHandle that describes where the blob is, and which
375    /// can be used either to commit the blob or delete it.
376    ///
377    /// See [`blob_consistency`] for more information on guarantees.
378    fn save_blob_internal(
379        &mut self,
380        contents: &[u8],
381        doctype: &str,
382        digest_type: &str,
383        digest: &[u8],
384        expires: OffsetDateTime,
385    ) -> Result<blob_handle::SavedBlobHandle<'_>> {
386        let digest = hex::encode(digest);
387        let digeststr = format!("{}-{}", digest_type, digest);
388        let fname = format!("{}_{}", doctype, digeststr);
389
390        let full_path = self.blob_dir.join(&fname)?;
391        let unlinker = blob_handle::Unlinker::new(&full_path);
392        self.blob_dir
393            .write_and_replace(&fname, contents)
394            .map_err(|e| match e {
395                fs_mistrust::Error::Io { err, .. } => Error::CacheFile {
396                    action: "saving",
397                    fname: full_path,
398                    error: err,
399                },
400                err => err.into(),
401            })?;
402
403        let tx = self.conn.unchecked_transaction()?;
404        tx.execute(INSERT_EXTDOC, params![digeststr, expires, doctype, fname])?;
405
406        Ok(blob_handle::SavedBlobHandle::new(
407            tx, fname, digeststr, unlinker,
408        ))
409    }
410
411    /// As `latest_consensus`, but do not retry.
412    fn latest_consensus_internal(
413        &self,
414        flavor: ConsensusFlavor,
415        pending: Option<bool>,
416    ) -> Result<StdResult<InputString, AbsentBlob>> {
417        trace!(?flavor, ?pending, "Loading latest consensus from cache");
418        let rv: Option<(OffsetDateTime, OffsetDateTime, String)> = match pending {
419            None => self
420                .conn
421                .query_row(FIND_CONSENSUS, params![flavor.name()], |row| row.try_into())
422                .optional()?,
423            Some(pending_val) => self
424                .conn
425                .query_row(
426                    FIND_CONSENSUS_P,
427                    params![pending_val, flavor.name()],
428                    |row| row.try_into(),
429                )
430                .optional()?,
431        };
432
433        if let Some((_va, _vu, filename)) = rv {
434            // TODO blobs: If the cache is inconsistent (because this blob is _vanished_), and the cache has not yet
435            // been cleaned, this may fail to find the latest consensus that we actually have.
436            self.read_blob(&filename)
437        } else {
438            Ok(Err(AbsentBlob::NothingToRead))
439        }
440    }
441
442    /// Save a blob to disk and commit it.
443    #[cfg(test)]
444    fn save_blob(
445        &mut self,
446        contents: &[u8],
447        doctype: &str,
448        digest_type: &str,
449        digest: &[u8],
450        expires: OffsetDateTime,
451    ) -> Result<String> {
452        let h = self.save_blob_internal(contents, doctype, digest_type, digest, expires)?;
453        let fname = h.fname().to_string();
454        h.commit()?;
455        Ok(fname)
456    }
457
458    /// Return the valid-after time for the latest non non-pending consensus,
459    #[cfg(test)]
460    // We should revise the tests to use latest_consensus_meta instead.
461    fn latest_consensus_time(&self, flavor: ConsensusFlavor) -> Result<Option<OffsetDateTime>> {
462        Ok(self
463            .latest_consensus_meta(flavor)?
464            .map(|m| m.lifetime().valid_after().into()))
465    }
466
467    /// Remove the blob with name `fname`, but do not give an error on failure.
468    ///
469    /// See [`blob_consistency`]: we should call this only having first ensured
470    /// that the blob is removed from the ExtDocs table.
471    fn remove_blob_or_warn<P: AsRef<Path>>(&self, fname: P) {
472        let fname = fname.as_ref();
473        if let Err(e) = self.blob_dir.remove_file(fname) {
474            warn_report!(e, "Unable to remove {}", fname.display_lossy());
475        }
476    }
477
478    /// Delete any blob files that are old enough, and not mentioned in the ExtDocs table.
479    ///
480    /// There shouldn't typically be any, but we don't want to let our cache grow infinitely
481    /// if we have a bug.
482    fn remove_unreferenced_blobs(
483        &self,
484        now: OffsetDateTime,
485        expiration: &ExpirationConfig,
486    ) -> Result<()> {
487        // Now, look for any unreferenced blobs that are a bit old.
488        for ent in self.blob_dir.read_directory(".")?.flatten() {
489            let md_error = |io_error| Error::CacheFile {
490                action: "getting metadata",
491                fname: ent.file_name().into(),
492                error: Arc::new(io_error),
493            };
494            if ent
495                .metadata()
496                .map_err(md_error)?
497                .modified()
498                .map_err(md_error)?
499                + expiration.consensuses
500                >= now
501            {
502                // this file is sufficiently recent that we should not remove it, just to be cautious.
503                continue;
504            }
505            let filename = match ent.file_name().into_string() {
506                Ok(s) => s,
507                Err(os_str) => {
508                    // This filename wasn't utf-8.  We will never create one of these.
509                    warn!(
510                        "Removing bizarre file '{}' from blob store.",
511                        os_str.to_string_lossy()
512                    );
513                    self.remove_blob_or_warn(ent.file_name());
514                    continue;
515                }
516            };
517            let found: (u32,) =
518                self.conn
519                    .query_row(COUNT_EXTDOC_BY_PATH, params![&filename], |row| {
520                        row.try_into()
521                    })?;
522            if found == (0,) {
523                warn!("Removing unreferenced file '{}' from blob store", &filename);
524                self.remove_blob_or_warn(ent.file_name());
525            }
526        }
527
528        Ok(())
529    }
530
531    /// Remove any entry in the ExtDocs table for which a blob file is vanished.
532    ///
533    /// This method is `O(n)` in the size of the ExtDocs table and the size of the directory.
534    /// It doesn't take self, to avoid problems with the borrow checker.
535    fn remove_entries_for_vanished_blobs<'a>(
536        blob_dir: &CheckedDir,
537        tx: &Transaction<'a>,
538    ) -> Result<usize> {
539        let in_directory: HashSet<PathBuf> = blob_dir
540            .read_directory(".")?
541            .flatten()
542            .map(|dir_entry| PathBuf::from(dir_entry.file_name()))
543            .collect();
544        let in_db: Vec<String> = tx
545            .prepare(FIND_ALL_EXTDOC_FILENAMES)?
546            .query_map([], |row| row.get::<_, String>(0))?
547            .collect::<StdResult<Vec<String>, _>>()?;
548
549        let mut n_removed = 0;
550        for fname in in_db {
551            if in_directory.contains(Path::new(&fname)) {
552                // The blob is present; great!
553                continue;
554            }
555
556            n_removed += tx.execute(DELETE_EXTDOC_BY_FILENAME, [fname])?;
557        }
558
559        Ok(n_removed)
560    }
561}
562
563impl Store for SqliteStore {
564    fn is_readonly(&self) -> bool {
565        match &self.lockfile {
566            Some(f) => !f.owns_lock(),
567            None => false,
568        }
569    }
570    fn upgrade_to_readwrite(&mut self) -> Result<bool> {
571        let Some(sql_path) = self.sql_path.as_ref() else {
572            return Ok(true);
573        };
574
575        if self.is_readonly() {
576            let lf = self
577                .lockfile
578                .as_mut()
579                .expect("No lockfile open; cannot upgrade to read-write storage");
580            if !lf.try_lock().map_err(Error::from_lockfile)? {
581                // Somebody else has the lock.
582                return Ok(false);
583            }
584            match rusqlite::Connection::open(sql_path) {
585                Ok(conn) => {
586                    self.conn = conn;
587                }
588                Err(e) => {
589                    if let Err(e2) = lf.unlock() {
590                        warn_report!(
591                            e2,
592                            "Unable to release lock file while upgrading DB to read/write"
593                        );
594                    }
595                    return Err(e.into());
596                }
597            }
598        }
599        Ok(true)
600    }
601    fn expire_all(&mut self, expiration: &ExpirationConfig) -> Result<()> {
602        let tx = self.conn.transaction()?;
603        // This works around a false positive; see
604        //   https://github.com/rust-lang/rust-clippy/issues/8114
605        #[allow(clippy::let_and_return)]
606        let expired_blobs: Vec<String> = {
607            let mut stmt = tx.prepare(FIND_EXPIRED_EXTDOCS)?;
608            let names: Vec<String> = stmt
609                .query_map([], |row| row.get::<_, String>(0))?
610                .collect::<StdResult<Vec<String>, _>>()?;
611            names
612        };
613
614        let now = now_utc();
615        tx.execute(DROP_OLD_EXTDOCS, [])?;
616
617        // In theory bad system clocks might generate table rows with times far in the future.
618        // However, for data which is cached here which comes from the network consensus,
619        // we rely on the fact that no consensus from the future exists, so this can't happen.
620        tx.execute(DROP_OLD_MICRODESCS, [now - expiration.microdescs])?;
621        tx.execute(DROP_OLD_AUTHCERTS, [now - expiration.authcerts])?;
622        tx.execute(DROP_OLD_CONSENSUSES, [now - expiration.consensuses])?;
623        tx.execute(DROP_OLD_ROUTERDESCS, [now - expiration.router_descs])?;
624
625        // Bridge descriptors come from bridges and bridges might send crazy times,
626        // so we need to discard any that look like they are from the future,
627        // since otherwise wrong far-future timestamps might live in our DB indefinitely.
628        #[cfg(feature = "bridge-client")]
629        tx.execute(DROP_OLD_BRIDGEDESCS, [now, now])?;
630
631        // Find all consensus blobs that are no longer referenced,
632        // and delete their entries from extdocs.
633        let remove_consensus_blobs = {
634            // TODO: This query can be O(n); but that won't matter for clients.
635            // For relays, we may want to add an index to speed it up, if we use this code there too.
636            let mut stmt = tx.prepare(FIND_UNREFERENCED_CONSENSUS_EXTDOCS)?;
637            let filenames: Vec<String> = stmt
638                .query_map([], |row| row.get::<_, String>(0))?
639                .collect::<StdResult<Vec<String>, _>>()?;
640            drop(stmt);
641            let mut stmt = tx.prepare(DELETE_EXTDOC_BY_FILENAME)?;
642            for fname in filenames.iter() {
643                stmt.execute([fname])?;
644            }
645            filenames
646        };
647
648        tx.commit()?;
649        // Now that the transaction has been committed, these blobs are
650        // unreferenced in the ExtDocs table, and we can remove them from disk.
651        let mut remove_blob_files: HashSet<_> = expired_blobs.iter().collect();
652        remove_blob_files.extend(remove_consensus_blobs.iter());
653
654        for name in remove_blob_files {
655            let fname = self.blob_dir.join(name);
656            if let Ok(fname) = fname {
657                if let Err(e) = std::fs::remove_file(&fname) {
658                    warn_report!(
659                        e,
660                        "Couldn't remove orphaned blob file {}",
661                        fname.display_lossy()
662                    );
663                }
664            }
665        }
666
667        self.remove_unreferenced_blobs(now, expiration)?;
668
669        Ok(())
670    }
671
672    // Note: We cannot, and do not, call this function when a transaction already exists.
673    fn latest_consensus(
674        &self,
675        flavor: ConsensusFlavor,
676        pending: Option<bool>,
677    ) -> Result<Option<InputString>> {
678        match self.latest_consensus_internal(flavor, pending)? {
679            Ok(s) => return Ok(Some(s)),
680            Err(AbsentBlob::NothingToRead) => return Ok(None),
681            Err(AbsentBlob::VanishedFile) => {
682                // If we get here, the file was vanished.  Clean up the DB and try again.
683            }
684        }
685
686        // We use unchecked_transaction() here because this API takes a non-mutable `SqliteStore`.
687        // `unchecked_transaction()` will give an error if it is used
688        // when a transaction already exists.
689        // That's fine: We don't call this function from inside this module,
690        // when a transaction might exist,
691        // and we can't call multiple SqliteStore functions at once: it isn't sync.
692        // Here we enforce that:
693        static_assertions::assert_not_impl_any!(SqliteStore: Sync);
694
695        // If we decide that this is unacceptable,
696        // then since sqlite doesn't really support concurrent use of a connection,
697        // we _could_ change the Store::latest_consensus API take &mut self,
698        // or we could add a mutex,
699        // or we could just not use a transaction object.
700        let tx = self.conn.unchecked_transaction()?;
701        Self::remove_entries_for_vanished_blobs(&self.blob_dir, &tx)?;
702        tx.commit()?;
703
704        match self.latest_consensus_internal(flavor, pending)? {
705            Ok(s) => Ok(Some(s)),
706            Err(AbsentBlob::NothingToRead) => Ok(None),
707            Err(AbsentBlob::VanishedFile) => {
708                warn!("Somehow remove_entries_for_vanished_blobs didn't resolve a VanishedFile");
709                Ok(None)
710            }
711        }
712    }
713
714    fn latest_consensus_meta(&self, flavor: ConsensusFlavor) -> Result<Option<ConsensusMeta>> {
715        let mut stmt = self.conn.prepare(FIND_LATEST_CONSENSUS_META)?;
716        let mut rows = stmt.query(params![flavor.name()])?;
717        if let Some(row) = rows.next()? {
718            Ok(Some(cmeta_from_row(row)?))
719        } else {
720            Ok(None)
721        }
722    }
723    #[cfg(test)]
724    fn consensus_by_meta(&self, cmeta: &ConsensusMeta) -> Result<InputString> {
725        if let Some((text, _)) =
726            self.consensus_by_sha3_digest_of_signed_part(cmeta.sha3_256_of_signed())?
727        {
728            Ok(text)
729        } else {
730            Err(Error::CacheCorruption(
731                "couldn't find a consensus we thought we had.",
732            ))
733        }
734    }
735    fn consensus_by_sha3_digest_of_signed_part(
736        &self,
737        d: &[u8; 32],
738    ) -> Result<Option<(InputString, ConsensusMeta)>> {
739        let digest = hex::encode(d);
740        let mut stmt = self
741            .conn
742            .prepare(FIND_CONSENSUS_AND_META_BY_DIGEST_OF_SIGNED)?;
743        let mut rows = stmt.query(params![digest])?;
744        if let Some(row) = rows.next()? {
745            let meta = cmeta_from_row(row)?;
746            let fname: String = row.get(5)?;
747            if let Ok(text) = self.read_blob(&fname)? {
748                return Ok(Some((text, meta)));
749            }
750        }
751        Ok(None)
752    }
753    fn store_consensus(
754        &mut self,
755        cmeta: &ConsensusMeta,
756        flavor: ConsensusFlavor,
757        pending: bool,
758        contents: &str,
759    ) -> Result<()> {
760        let lifetime = cmeta.lifetime();
761        let sha3_of_signed = cmeta.sha3_256_of_signed();
762        let sha3_of_whole = cmeta.sha3_256_of_whole();
763        let valid_after: OffsetDateTime = lifetime.valid_after().into();
764        let fresh_until: OffsetDateTime = lifetime.fresh_until().into();
765        let valid_until: OffsetDateTime = lifetime.valid_until().into();
766
767        /// How long to keep a consensus around after it has expired
768        const CONSENSUS_LIFETIME: time::Duration = time::Duration::days(4);
769
770        // After a few days have passed, a consensus is no good for
771        // anything at all, not even diffs.
772        let expires = valid_until + CONSENSUS_LIFETIME;
773
774        let doctype = format!("con_{}", flavor.name());
775
776        let h = self.save_blob_internal(
777            contents.as_bytes(),
778            &doctype,
779            "sha3-256",
780            &sha3_of_whole[..],
781            expires,
782        )?;
783        h.tx().execute(
784            INSERT_CONSENSUS,
785            params![
786                valid_after,
787                fresh_until,
788                valid_until,
789                flavor.name(),
790                pending,
791                hex::encode(sha3_of_signed),
792                h.digest_string()
793            ],
794        )?;
795        h.commit()?;
796        Ok(())
797    }
798    fn mark_consensus_usable(&mut self, cmeta: &ConsensusMeta) -> Result<()> {
799        let d = hex::encode(cmeta.sha3_256_of_whole());
800        let digest = format!("sha3-256-{}", d);
801
802        let tx = self.conn.transaction()?;
803        let n = tx.execute(MARK_CONSENSUS_NON_PENDING, params![digest])?;
804        trace!("Marked {} consensuses usable", n);
805        tx.commit()?;
806
807        Ok(())
808    }
809    fn delete_consensus(&mut self, cmeta: &ConsensusMeta) -> Result<()> {
810        let d = hex::encode(cmeta.sha3_256_of_whole());
811        let digest = format!("sha3-256-{}", d);
812
813        // TODO: We should probably remove the blob as well, but for now
814        // this is enough.
815        let tx = self.conn.transaction()?;
816        tx.execute(REMOVE_CONSENSUS, params![digest])?;
817        tx.commit()?;
818
819        Ok(())
820    }
821
822    fn authcerts(&self, certs: &[AuthCertKeyIds]) -> Result<HashMap<AuthCertKeyIds, String>> {
823        let mut result = HashMap::new();
824        // TODO(nickm): Do I need to get a transaction here for performance?
825        let mut stmt = self.conn.prepare(FIND_AUTHCERT)?;
826
827        for ids in certs {
828            let id_digest = hex::encode(ids.id_fingerprint.as_bytes());
829            let sk_digest = hex::encode(ids.sk_fingerprint.as_bytes());
830            if let Some(contents) = stmt
831                .query_row(params![id_digest, sk_digest], |row| row.get::<_, String>(0))
832                .optional()?
833            {
834                result.insert(*ids, contents);
835            }
836        }
837
838        Ok(result)
839    }
840    fn store_authcerts(&mut self, certs: &[(AuthCertMeta, &str)]) -> Result<()> {
841        let tx = self.conn.transaction()?;
842        let mut stmt = tx.prepare(INSERT_AUTHCERT)?;
843        for (meta, content) in certs {
844            let ids = meta.key_ids();
845            let id_digest = hex::encode(ids.id_fingerprint.as_bytes());
846            let sk_digest = hex::encode(ids.sk_fingerprint.as_bytes());
847            let published: OffsetDateTime = meta.published().into();
848            let expires: OffsetDateTime = meta.expires().into();
849            stmt.execute(params![id_digest, sk_digest, published, expires, content])?;
850        }
851        stmt.finalize()?;
852        tx.commit()?;
853        Ok(())
854    }
855
856    fn microdescs(&self, digests: &[MdDigest]) -> Result<HashMap<MdDigest, String>> {
857        let mut result = HashMap::new();
858        let mut stmt = self.conn.prepare(FIND_MD)?;
859
860        // TODO(nickm): Should I speed this up with a transaction, or
861        // does it not matter for queries?
862        for md_digest in digests {
863            let h_digest = hex::encode(md_digest);
864            if let Some(contents) = stmt
865                .query_row(params![h_digest], |row| row.get::<_, String>(0))
866                .optional()?
867            {
868                result.insert(*md_digest, contents);
869            }
870        }
871
872        Ok(result)
873    }
874    fn store_microdescs(&mut self, digests: &[(&str, &MdDigest)], when: SystemTime) -> Result<()> {
875        let when: OffsetDateTime = when.into();
876
877        let tx = self.conn.transaction()?;
878        let mut stmt = tx.prepare(INSERT_MD)?;
879
880        for (content, md_digest) in digests {
881            let h_digest = hex::encode(md_digest);
882            stmt.execute(params![h_digest, when, content])?;
883        }
884        stmt.finalize()?;
885        tx.commit()?;
886        Ok(())
887    }
888    fn update_microdescs_listed(&mut self, digests: &[MdDigest], when: SystemTime) -> Result<()> {
889        let tx = self.conn.transaction()?;
890        let mut stmt = tx.prepare(UPDATE_MD_LISTED)?;
891        let when: OffsetDateTime = when.into();
892
893        for md_digest in digests {
894            let h_digest = hex::encode(md_digest);
895            stmt.execute(params![when, h_digest])?;
896        }
897
898        stmt.finalize()?;
899        tx.commit()?;
900        Ok(())
901    }
902
903    #[cfg(feature = "routerdesc")]
904    fn routerdescs(&self, digests: &[RdDigest]) -> Result<HashMap<RdDigest, String>> {
905        let mut result = HashMap::new();
906        let mut stmt = self.conn.prepare(FIND_RD)?;
907
908        // TODO(nickm): Should I speed this up with a transaction, or
909        // does it not matter for queries?
910        for rd_digest in digests {
911            let h_digest = hex::encode(rd_digest);
912            if let Some(contents) = stmt
913                .query_row(params![h_digest], |row| row.get::<_, String>(0))
914                .optional()?
915            {
916                result.insert(*rd_digest, contents);
917            }
918        }
919
920        Ok(result)
921    }
922    #[cfg(feature = "routerdesc")]
923    fn store_routerdescs(&mut self, digests: &[(&str, SystemTime, &RdDigest)]) -> Result<()> {
924        let tx = self.conn.transaction()?;
925        let mut stmt = tx.prepare(INSERT_RD)?;
926
927        for (content, when, rd_digest) in digests {
928            let when: OffsetDateTime = (*when).into();
929            let h_digest = hex::encode(rd_digest);
930            stmt.execute(params![h_digest, when, content])?;
931        }
932        stmt.finalize()?;
933        tx.commit()?;
934        Ok(())
935    }
936
937    #[cfg(feature = "bridge-client")]
938    fn lookup_bridgedesc(&self, bridge: &BridgeConfig) -> Result<Option<CachedBridgeDescriptor>> {
939        let bridge_line = bridge.to_string();
940        Ok(self
941            .conn
942            .query_row(FIND_BRIDGEDESC, params![bridge_line], |row| {
943                let (fetched, document): (OffsetDateTime, _) = row.try_into()?;
944                let fetched = fetched.into();
945                Ok(CachedBridgeDescriptor { fetched, document })
946            })
947            .optional()?)
948    }
949
950    #[cfg(feature = "bridge-client")]
951    fn store_bridgedesc(
952        &mut self,
953        bridge: &BridgeConfig,
954        entry: CachedBridgeDescriptor,
955        until: SystemTime,
956    ) -> Result<()> {
957        if self.is_readonly() {
958            // Hopefully whoever *does* have the lock will update the cache.
959            // Otherwise it will contain a stale entry forever
960            // (which we'll ignore, but waste effort on).
961            return Ok(());
962        }
963        let bridge_line = bridge.to_string();
964        let row = params![
965            bridge_line,
966            OffsetDateTime::from(entry.fetched),
967            OffsetDateTime::from(until),
968            entry.document,
969        ];
970        self.conn.execute(INSERT_BRIDGEDESC, row)?;
971        Ok(())
972    }
973
974    #[cfg(feature = "bridge-client")]
975    fn delete_bridgedesc(&mut self, bridge: &BridgeConfig) -> Result<()> {
976        if self.is_readonly() {
977            // This is called when we find corrupted or stale cache entries,
978            // to stop us wasting time on them next time.
979            // Hopefully whoever *does* have the lock will do this.
980            return Ok(());
981        }
982        let bridge_line = bridge.to_string();
983        self.conn.execute(DELETE_BRIDGEDESC, params![bridge_line])?;
984        Ok(())
985    }
986
987    fn update_protocol_recommendations(
988        &mut self,
989        valid_after: SystemTime,
990        protocols: &tor_netdoc::doc::netstatus::ProtoStatuses,
991    ) -> Result<()> {
992        let json =
993            serde_json::to_string(&protocols).map_err(into_internal!("Cannot encode protocols"))?;
994        let params = params![OffsetDateTime::from(valid_after), json];
995        self.conn.execute(UPDATE_PROTOCOL_STATUS, params)?;
996        Ok(())
997    }
998
999    fn cached_protocol_recommendations(
1000        &self,
1001    ) -> Result<Option<(SystemTime, tor_netdoc::doc::netstatus::ProtoStatuses)>> {
1002        let opt_row: Option<(OffsetDateTime, String)> = self
1003            .conn
1004            .query_row(FIND_LATEST_PROTOCOL_STATUS, [], |row| {
1005                Ok((row.get(0)?, row.get(1)?))
1006            })
1007            .optional()?;
1008
1009        let (date, json) = match opt_row {
1010            Some(v) => v,
1011            None => return Ok(None),
1012        };
1013
1014        let date = date.into();
1015        let statuses: tor_netdoc::doc::netstatus::ProtoStatuses =
1016            serde_json::from_str(json.as_str()).map_err(|e| Error::BadJsonInCache(Arc::new(e)))?;
1017
1018        Ok(Some((date, statuses)))
1019    }
1020}
1021
1022/// Functionality related to uncommitted blobs.
1023mod blob_handle {
1024    use std::path::{Path, PathBuf};
1025
1026    use crate::Result;
1027    use rusqlite::Transaction;
1028    use tor_basic_utils::PathExt as _;
1029    use tor_error::warn_report;
1030
1031    /// Handle to a blob that we have saved to disk but
1032    /// not yet committed to
1033    /// the database, and the database transaction where we added a reference to it.
1034    ///
1035    /// Used to either commit the blob (by calling [`SavedBlobHandle::commit`]),
1036    /// or roll it back (by dropping the [`SavedBlobHandle`] without committing it.)
1037    #[must_use]
1038    pub(super) struct SavedBlobHandle<'a> {
1039        /// Transaction we're using to add the blob to the ExtDocs table.
1040        ///
1041        /// Note that struct fields are dropped in declaration order,
1042        /// so when we drop an uncommitted SavedBlobHandle,
1043        /// we roll back the transaction before we delete the file.
1044        /// (In practice, either order would be fine.)
1045        tx: Transaction<'a>,
1046        /// Filename for the file, with respect to the blob directory.
1047        fname: String,
1048        /// Declared digest string for this blob. Of the format
1049        /// "digesttype-hexstr".
1050        digeststr: String,
1051        /// An 'unlinker' for the blob file.
1052        unlinker: Unlinker,
1053    }
1054
1055    impl<'a> SavedBlobHandle<'a> {
1056        /// Construct a SavedBlobHandle from its parts.
1057        pub(super) fn new(
1058            tx: Transaction<'a>,
1059            fname: String,
1060            digeststr: String,
1061            unlinker: Unlinker,
1062        ) -> Self {
1063            Self {
1064                tx,
1065                fname,
1066                digeststr,
1067                unlinker,
1068            }
1069        }
1070
1071        /// Return a reference to the underlying database transaction.
1072        pub(super) fn tx(&self) -> &Transaction<'a> {
1073            &self.tx
1074        }
1075        /// Return the digest string of the saved blob.
1076        /// Other tables use this as a foreign key into ExtDocs.digest
1077        pub(super) fn digest_string(&self) -> &str {
1078            self.digeststr.as_ref()
1079        }
1080        /// Return the filename of this blob within the blob directory.
1081        #[allow(unused)] // used for testing.
1082        pub(super) fn fname(&self) -> &str {
1083            self.fname.as_ref()
1084        }
1085        /// Commit the relevant database transaction.
1086        pub(super) fn commit(self) -> Result<()> {
1087            // The blob has been written to disk, so it is safe to
1088            // commit the transaction.
1089            // If the commit returns an error, self.unlinker will remove the blob.
1090            // (This could result in a vanished blob if the commit reports an error,
1091            // but the transaction is still visible in the database.)
1092            self.tx.commit()?;
1093            // If we reach this point, we don't want to remove the file.
1094            self.unlinker.forget();
1095            Ok(())
1096        }
1097    }
1098
1099    /// Handle to a file which we might have to delete.
1100    ///
1101    /// When this handle is dropped, the file gets deleted, unless you have
1102    /// first called [`Unlinker::forget`].
1103    pub(super) struct Unlinker {
1104        /// The location of the file to remove, or None if we shouldn't
1105        /// remove it.
1106        p: Option<PathBuf>,
1107    }
1108    impl Unlinker {
1109        /// Make a new Unlinker for a given filename.
1110        pub(super) fn new<P: AsRef<Path>>(p: P) -> Self {
1111            Unlinker {
1112                p: Some(p.as_ref().to_path_buf()),
1113            }
1114        }
1115        /// Forget about this unlinker, so that the corresponding file won't
1116        /// get dropped.
1117        fn forget(mut self) {
1118            self.p = None;
1119        }
1120    }
1121    impl Drop for Unlinker {
1122        fn drop(&mut self) {
1123            if let Some(p) = self.p.take() {
1124                if let Err(e) = std::fs::remove_file(&p) {
1125                    warn_report!(
1126                        e,
1127                        "Couldn't remove rolled-back blob file {}",
1128                        p.display_lossy()
1129                    );
1130                }
1131            }
1132        }
1133    }
1134}
1135
1136/// Convert a hexadecimal sha3-256 digest from the database into an array.
1137fn digest_from_hex(s: &str) -> Result<[u8; 32]> {
1138    let mut bytes = [0_u8; 32];
1139    hex::decode_to_slice(s, &mut bytes[..]).map_err(Error::BadHexInCache)?;
1140    Ok(bytes)
1141}
1142
1143/// Convert a hexadecimal sha3-256 "digest string" as used in the
1144/// digest column from the database into an array.
1145fn digest_from_dstr(s: &str) -> Result<[u8; 32]> {
1146    if let Some(stripped) = s.strip_prefix("sha3-256-") {
1147        digest_from_hex(stripped)
1148    } else {
1149        Err(Error::CacheCorruption("Invalid digest in database"))
1150    }
1151}
1152
1153/// Create a ConsensusMeta from a `Row` returned by one of
1154/// `FIND_LATEST_CONSENSUS_META` or `FIND_CONSENSUS_AND_META_BY_DIGEST`.
1155fn cmeta_from_row(row: &rusqlite::Row<'_>) -> Result<ConsensusMeta> {
1156    let va: OffsetDateTime = row.get(0)?;
1157    let fu: OffsetDateTime = row.get(1)?;
1158    let vu: OffsetDateTime = row.get(2)?;
1159    let d_signed: String = row.get(3)?;
1160    let d_all: String = row.get(4)?;
1161    let lifetime = Lifetime::new(va.into(), fu.into(), vu.into())
1162        .map_err(|_| Error::CacheCorruption("inconsistent lifetime in database"))?;
1163    Ok(ConsensusMeta::new(
1164        lifetime,
1165        digest_from_hex(&d_signed)?,
1166        digest_from_dstr(&d_all)?,
1167    ))
1168}
1169
1170/// Return `SystemTime::get()` as an OffsetDateTime in UTC.
1171fn now_utc() -> OffsetDateTime {
1172    SystemTime::get().into()
1173}
1174
1175/// Set up the tables for the arti cache schema in a sqlite database.
1176const INSTALL_V0_SCHEMA: &str = "
1177  -- Helps us version the schema.  The schema here corresponds to a
1178  -- version number called 'version', and it should be readable by
1179  -- anybody who is compliant with versions of at least 'readable_by'.
1180  CREATE TABLE TorSchemaMeta (
1181     name TEXT NOT NULL PRIMARY KEY,
1182     version INTEGER NOT NULL,
1183     readable_by INTEGER NOT NULL
1184  );
1185
1186  INSERT INTO TorSchemaMeta (name, version, readable_by) VALUES ( 'TorDirStorage', 0, 0 );
1187
1188  -- Keeps track of external blobs on disk.
1189  CREATE TABLE ExtDocs (
1190    -- Records a digest of the file contents, in the form '<digest_type>-hexstr'
1191    digest TEXT PRIMARY KEY NOT NULL,
1192    -- When was this file created?
1193    created DATE NOT NULL,
1194    -- After what time will this file definitely be useless?
1195    expires DATE NOT NULL,
1196    -- What is the type of this file? Currently supported are 'con_<flavor>'.
1197    --   (Before tor-dirmgr ~0.28.0, we would erroneously record 'con_flavor' as 'sha3-256';
1198    --   Nothing depended on this yet, but will be used in the future
1199    --   as we add more large-document types.)
1200    type TEXT NOT NULL,
1201    -- Filename for this file within our blob directory.
1202    filename TEXT NOT NULL
1203  );
1204
1205  -- All the microdescriptors we know about.
1206  CREATE TABLE Microdescs (
1207    sha256_digest TEXT PRIMARY KEY NOT NULL,
1208    last_listed DATE NOT NULL,
1209    contents BLOB NOT NULL
1210  );
1211
1212  -- All the authority certificates we know.
1213  CREATE TABLE Authcerts (
1214    id_digest TEXT NOT NULL,
1215    sk_digest TEXT NOT NULL,
1216    published DATE NOT NULL,
1217    expires DATE NOT NULL,
1218    contents BLOB NOT NULL,
1219    PRIMARY KEY (id_digest, sk_digest)
1220  );
1221
1222  -- All the consensuses we're storing.
1223  CREATE TABLE Consensuses (
1224    valid_after DATE NOT NULL,
1225    fresh_until DATE NOT NULL,
1226    valid_until DATE NOT NULL,
1227    flavor TEXT NOT NULL,
1228    pending BOOLEAN NOT NULL,
1229    sha3_of_signed_part TEXT NOT NULL,
1230    digest TEXT NOT NULL,
1231    FOREIGN KEY (digest) REFERENCES ExtDocs (digest) ON DELETE CASCADE
1232  );
1233  CREATE INDEX Consensuses_vu on CONSENSUSES(valid_until);
1234
1235";
1236
1237/// Update the database schema, from each version to the next
1238const UPDATE_SCHEMA: &[&str] = &["
1239  -- Update the database schema from version 0 to version 1.
1240  CREATE TABLE RouterDescs (
1241    sha1_digest TEXT PRIMARY KEY NOT NULL,
1242    published DATE NOT NULL,
1243    contents BLOB NOT NULL
1244  );
1245","
1246  -- Update the database schema from version 1 to version 2.
1247  -- We create this table even if the bridge-client feature is disabled, but then don't touch it at all.
1248  CREATE TABLE BridgeDescs (
1249    bridge_line TEXT PRIMARY KEY NOT NULL,
1250    fetched DATE NOT NULL,
1251    until DATE NOT NULL,
1252    contents BLOB NOT NULL
1253  );
1254","
1255 -- Update the database schema from version 2 to version 3.
1256
1257 -- Table to hold our latest ProtocolStatuses object, to tell us if we're obsolete.
1258 -- We hold this independently from our consensus,
1259 -- since we want to read it very early in our startup process,
1260 -- even if the consensus is expired.
1261 CREATE TABLE ProtocolStatus (
1262    -- Enforce that there is only one row in this table.
1263    -- (This is a bit kludgy, but I am assured that it is a common practice.)
1264    zero INTEGER PRIMARY KEY NOT NULL,
1265    -- valid-after date of the consensus from which we got this status
1266    date DATE NOT NULL,
1267    -- ProtoStatuses object, encoded as json
1268    statuses TEXT NOT NULL
1269 );
1270"];
1271
1272/// Update the database schema version tracking, from each version to the next
1273const UPDATE_SCHEMA_VERSION: &str = "
1274  UPDATE TorSchemaMeta SET version=? WHERE version<?;
1275";
1276
1277/// Version number used for this version of the arti cache schema.
1278const SCHEMA_VERSION: u32 = UPDATE_SCHEMA.len() as u32;
1279
1280/// Query: find the latest-expiring microdesc consensus with a given
1281/// pending status.
1282const FIND_CONSENSUS_P: &str = "
1283  SELECT valid_after, valid_until, filename
1284  FROM Consensuses
1285  INNER JOIN ExtDocs ON ExtDocs.digest = Consensuses.digest
1286  WHERE pending = ? AND flavor = ?
1287  ORDER BY valid_until DESC
1288  LIMIT 1;
1289";
1290
1291/// Query: find the latest-expiring microdesc consensus, regardless of
1292/// pending status.
1293const FIND_CONSENSUS: &str = "
1294  SELECT valid_after, valid_until, filename
1295  FROM Consensuses
1296  INNER JOIN ExtDocs ON ExtDocs.digest = Consensuses.digest
1297  WHERE flavor = ?
1298  ORDER BY valid_until DESC
1299  LIMIT 1;
1300";
1301
1302/// Query: Find the valid-after time for the latest-expiring
1303/// non-pending consensus of a given flavor.
1304const FIND_LATEST_CONSENSUS_META: &str = "
1305  SELECT valid_after, fresh_until, valid_until, sha3_of_signed_part, digest
1306  FROM Consensuses
1307  WHERE pending = 0 AND flavor = ?
1308  ORDER BY valid_until DESC
1309  LIMIT 1;
1310";
1311
1312/// Look up a consensus by its digest-of-signed-part string.
1313const FIND_CONSENSUS_AND_META_BY_DIGEST_OF_SIGNED: &str = "
1314  SELECT valid_after, fresh_until, valid_until, sha3_of_signed_part, Consensuses.digest, filename
1315  FROM Consensuses
1316  INNER JOIN ExtDocs on ExtDocs.digest = Consensuses.digest
1317  WHERE Consensuses.sha3_of_signed_part = ?
1318  LIMIT 1;
1319";
1320
1321/// Query: Update the consensus whose digest field is 'digest' to call it
1322/// no longer pending.
1323const MARK_CONSENSUS_NON_PENDING: &str = "
1324  UPDATE Consensuses
1325  SET pending = 0
1326  WHERE digest = ?;
1327";
1328
1329/// Query: Remove the consensus with a given digest field.
1330#[allow(dead_code)]
1331const REMOVE_CONSENSUS: &str = "
1332  DELETE FROM Consensuses
1333  WHERE digest = ?;
1334";
1335
1336/// Query: Find the authority certificate with given key digests.
1337const FIND_AUTHCERT: &str = "
1338  SELECT contents FROM AuthCerts WHERE id_digest = ? AND sk_digest = ?;
1339";
1340
1341/// Query: find the microdescriptor with a given hex-encoded sha256 digest
1342const FIND_MD: &str = "
1343  SELECT contents
1344  FROM Microdescs
1345  WHERE sha256_digest = ?
1346";
1347
1348/// Query: find the router descriptors with a given hex-encoded sha1 digest
1349#[cfg(feature = "routerdesc")]
1350const FIND_RD: &str = "
1351  SELECT contents
1352  FROM RouterDescs
1353  WHERE sha1_digest = ?
1354";
1355
1356/// Query: find every ExtDocs member that has expired.
1357const FIND_EXPIRED_EXTDOCS: &str = "
1358  SELECT filename FROM ExtDocs where expires < datetime('now');
1359";
1360
1361/// Query: find whether an ExtDoc is listed.
1362const COUNT_EXTDOC_BY_PATH: &str = "
1363  SELECT COUNT(*) FROM ExtDocs WHERE filename = ?;
1364";
1365
1366/// Query: Add a new entry to ExtDocs.
1367const INSERT_EXTDOC: &str = "
1368  INSERT OR REPLACE INTO ExtDocs ( digest, created, expires, type, filename )
1369  VALUES ( ?, datetime('now'), ?, ?, ? );
1370";
1371
1372/// Query: Add a new consensus.
1373const INSERT_CONSENSUS: &str = "
1374  INSERT OR REPLACE INTO Consensuses
1375    ( valid_after, fresh_until, valid_until, flavor, pending, sha3_of_signed_part, digest )
1376  VALUES ( ?, ?, ?, ?, ?, ?, ? );
1377";
1378
1379/// Query: Add a new AuthCert
1380const INSERT_AUTHCERT: &str = "
1381  INSERT OR REPLACE INTO Authcerts
1382    ( id_digest, sk_digest, published, expires, contents)
1383  VALUES ( ?, ?, ?, ?, ? );
1384";
1385
1386/// Query: Add a new microdescriptor
1387const INSERT_MD: &str = "
1388  INSERT OR REPLACE INTO Microdescs ( sha256_digest, last_listed, contents )
1389  VALUES ( ?, ?, ? );
1390";
1391
1392/// Query: Add a new router descriptor
1393#[allow(unused)]
1394#[cfg(feature = "routerdesc")]
1395const INSERT_RD: &str = "
1396  INSERT OR REPLACE INTO RouterDescs ( sha1_digest, published, contents )
1397  VALUES ( ?, ?, ? );
1398";
1399
1400/// Query: Change the time when a given microdescriptor was last listed.
1401const UPDATE_MD_LISTED: &str = "
1402  UPDATE Microdescs
1403  SET last_listed = max(last_listed, ?)
1404  WHERE sha256_digest = ?;
1405";
1406
1407/// Query: Find a cached bridge descriptor
1408#[cfg(feature = "bridge-client")]
1409const FIND_BRIDGEDESC: &str = "SELECT fetched, contents FROM BridgeDescs WHERE bridge_line = ?;";
1410/// Query: Record a cached bridge descriptor
1411#[cfg(feature = "bridge-client")]
1412const INSERT_BRIDGEDESC: &str = "
1413  INSERT OR REPLACE INTO BridgeDescs ( bridge_line, fetched, until, contents )
1414  VALUES ( ?, ?, ?, ? );
1415";
1416/// Query: Remove a cached bridge descriptor
1417#[cfg(feature = "bridge-client")]
1418#[allow(dead_code)]
1419const DELETE_BRIDGEDESC: &str = "DELETE FROM BridgeDescs WHERE bridge_line = ?;";
1420
1421/// Query: Find all consensus extdocs that are not referenced in the consensus table.
1422///
1423/// Note: use of `sha3-256` is a synonym for `con_%` is a workaround.
1424const FIND_UNREFERENCED_CONSENSUS_EXTDOCS: &str = "
1425    SELECT filename FROM ExtDocs WHERE
1426         (type LIKE 'con_%' OR type = 'sha3-256')
1427    AND NOT EXISTS
1428         (SELECT digest FROM Consensuses WHERE Consensuses.digest = ExtDocs.digest);";
1429
1430/// Query: Discard every expired extdoc.
1431///
1432/// External documents aren't exposed through [`Store`].
1433const DROP_OLD_EXTDOCS: &str = "DELETE FROM ExtDocs WHERE expires < datetime('now');";
1434
1435/// Query: Discard an extdoc with a given path.
1436const DELETE_EXTDOC_BY_FILENAME: &str = "DELETE FROM ExtDocs WHERE filename = ?;";
1437
1438/// Query: List all extdoc filenames.
1439const FIND_ALL_EXTDOC_FILENAMES: &str = "SELECT filename FROM ExtDocs;";
1440
1441/// Query: Get the latest protocol status.
1442const FIND_LATEST_PROTOCOL_STATUS: &str = "SELECT date, statuses FROM ProtocolStatus WHERE zero=0;";
1443/// Query: Update the latest protocol status.
1444const UPDATE_PROTOCOL_STATUS: &str = "INSERT OR REPLACE INTO ProtocolStatus VALUES ( 0, ?, ? );";
1445
1446/// Query: Discard every router descriptor that hasn't been listed for 3
1447/// months.
1448// TODO: Choose a more realistic time.
1449const DROP_OLD_ROUTERDESCS: &str = "DELETE FROM RouterDescs WHERE published < ?;";
1450/// Query: Discard every microdescriptor that hasn't been listed for 3 months.
1451// TODO: Choose a more realistic time.
1452const DROP_OLD_MICRODESCS: &str = "DELETE FROM Microdescs WHERE last_listed < ?;";
1453/// Query: Discard every expired authority certificate.
1454const DROP_OLD_AUTHCERTS: &str = "DELETE FROM Authcerts WHERE expires < ?;";
1455/// Query: Discard every consensus that's been expired for at least
1456/// two days.
1457const DROP_OLD_CONSENSUSES: &str = "DELETE FROM Consensuses WHERE valid_until < ?;";
1458/// Query: Discard every bridge descriptor that is too old, or from the future.  (Both ?=now.)
1459#[cfg(feature = "bridge-client")]
1460const DROP_OLD_BRIDGEDESCS: &str = "DELETE FROM BridgeDescs WHERE ? > until OR fetched > ?;";
1461
1462#[cfg(test)]
1463pub(crate) mod test {
1464    #![allow(clippy::unwrap_used)]
1465    use super::*;
1466    use crate::storage::EXPIRATION_DEFAULTS;
1467    use digest::Digest;
1468    use hex_literal::hex;
1469    use tempfile::{TempDir, tempdir};
1470    use time::ext::NumericalDuration;
1471    use tor_llcrypto::d::Sha3_256;
1472
1473    pub(crate) fn new_empty() -> Result<(TempDir, SqliteStore)> {
1474        let tmp_dir = tempdir().unwrap();
1475        let sql_path = tmp_dir.path().join("db.sql");
1476        let conn = rusqlite::Connection::open(sql_path)?;
1477        let blob_path = tmp_dir.path().join("blobs");
1478        let blob_dir = fs_mistrust::Mistrust::builder()
1479            .dangerously_trust_everyone()
1480            .build()
1481            .unwrap()
1482            .verifier()
1483            .make_secure_dir(blob_path)
1484            .unwrap();
1485        let store = SqliteStore::from_conn(conn, blob_dir)?;
1486
1487        Ok((tmp_dir, store))
1488    }
1489
1490    #[test]
1491    fn init() -> Result<()> {
1492        let tmp_dir = tempdir().unwrap();
1493        let blob_dir = fs_mistrust::Mistrust::builder()
1494            .dangerously_trust_everyone()
1495            .build()
1496            .unwrap()
1497            .verifier()
1498            .secure_dir(&tmp_dir)
1499            .unwrap();
1500        let sql_path = tmp_dir.path().join("db.sql");
1501        // Initial setup: everything should work.
1502        {
1503            let conn = rusqlite::Connection::open(&sql_path)?;
1504            let _store = SqliteStore::from_conn(conn, blob_dir.clone())?;
1505        }
1506        // Second setup: shouldn't need to upgrade.
1507        {
1508            let conn = rusqlite::Connection::open(&sql_path)?;
1509            let _store = SqliteStore::from_conn(conn, blob_dir.clone())?;
1510        }
1511        // Third setup: shouldn't need to upgrade.
1512        {
1513            let conn = rusqlite::Connection::open(&sql_path)?;
1514            conn.execute_batch("UPDATE TorSchemaMeta SET version = 9002;")?;
1515            let _store = SqliteStore::from_conn(conn, blob_dir.clone())?;
1516        }
1517        // Fourth: this says we can't read it, so we'll get an error.
1518        {
1519            let conn = rusqlite::Connection::open(&sql_path)?;
1520            conn.execute_batch("UPDATE TorSchemaMeta SET readable_by = 9001;")?;
1521            let val = SqliteStore::from_conn(conn, blob_dir);
1522            assert!(val.is_err());
1523        }
1524        Ok(())
1525    }
1526
1527    #[test]
1528    fn bad_blob_fname() -> Result<()> {
1529        let (_tmp_dir, store) = new_empty()?;
1530
1531        assert!(store.blob_dir.join("abcd").is_ok());
1532        assert!(store.blob_dir.join("abcd..").is_ok());
1533        assert!(store.blob_dir.join("..abcd..").is_ok());
1534        assert!(store.blob_dir.join(".abcd").is_ok());
1535
1536        assert!(store.blob_dir.join("..").is_err());
1537        assert!(store.blob_dir.join("../abcd").is_err());
1538        assert!(store.blob_dir.join("/abcd").is_err());
1539
1540        Ok(())
1541    }
1542
1543    #[test]
1544    fn blobs() -> Result<()> {
1545        let (_tmp_dir, mut store) = new_empty()?;
1546
1547        let now = now_utc();
1548        let one_week = 1.weeks();
1549
1550        let fname1 = store.save_blob(
1551            b"Hello world",
1552            "greeting",
1553            "sha1",
1554            &hex!("7b502c3a1f48c8609ae212cdfb639dee39673f5e"),
1555            now + one_week,
1556        )?;
1557
1558        let fname2 = store.save_blob(
1559            b"Goodbye, dear friends",
1560            "greeting",
1561            "sha1",
1562            &hex!("2149c2a7dbf5be2bb36fb3c5080d0fb14cb3355c"),
1563            now - one_week,
1564        )?;
1565
1566        assert_eq!(
1567            fname1,
1568            "greeting_sha1-7b502c3a1f48c8609ae212cdfb639dee39673f5e"
1569        );
1570        assert_eq!(
1571            &std::fs::read(store.blob_dir.join(&fname1)?).unwrap()[..],
1572            b"Hello world"
1573        );
1574        assert_eq!(
1575            &std::fs::read(store.blob_dir.join(&fname2)?).unwrap()[..],
1576            b"Goodbye, dear friends"
1577        );
1578
1579        let n: u32 = store
1580            .conn
1581            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
1582        assert_eq!(n, 2);
1583
1584        let blob = store.read_blob(&fname2)?.unwrap();
1585        assert_eq!(blob.as_str().unwrap(), "Goodbye, dear friends");
1586
1587        // Now expire: the second file should go away.
1588        store.expire_all(&EXPIRATION_DEFAULTS)?;
1589        assert_eq!(
1590            &std::fs::read(store.blob_dir.join(&fname1)?).unwrap()[..],
1591            b"Hello world"
1592        );
1593        assert!(std::fs::read(store.blob_dir.join(&fname2)?).is_err());
1594        let n: u32 = store
1595            .conn
1596            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
1597        assert_eq!(n, 1);
1598
1599        Ok(())
1600    }
1601
1602    #[test]
1603    fn consensus() -> Result<()> {
1604        use tor_netdoc::doc::netstatus;
1605
1606        let (_tmp_dir, mut store) = new_empty()?;
1607        let now = now_utc();
1608        let one_hour = 1.hours();
1609
1610        assert_eq!(
1611            store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
1612            None
1613        );
1614
1615        let cmeta = ConsensusMeta::new(
1616            netstatus::Lifetime::new(
1617                now.into(),
1618                (now + one_hour).into(),
1619                SystemTime::from(now + one_hour * 2),
1620            )
1621            .unwrap(),
1622            [0xAB; 32],
1623            [0xBC; 32],
1624        );
1625
1626        store.store_consensus(
1627            &cmeta,
1628            ConsensusFlavor::Microdesc,
1629            true,
1630            "Pretend this is a consensus",
1631        )?;
1632
1633        {
1634            assert_eq!(
1635                store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
1636                None
1637            );
1638            let consensus = store
1639                .latest_consensus(ConsensusFlavor::Microdesc, None)?
1640                .unwrap();
1641            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
1642            let consensus = store.latest_consensus(ConsensusFlavor::Microdesc, Some(false))?;
1643            assert!(consensus.is_none());
1644        }
1645
1646        store.mark_consensus_usable(&cmeta)?;
1647
1648        {
1649            assert_eq!(
1650                store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
1651                now.into()
1652            );
1653            let consensus = store
1654                .latest_consensus(ConsensusFlavor::Microdesc, None)?
1655                .unwrap();
1656            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
1657            let consensus = store
1658                .latest_consensus(ConsensusFlavor::Microdesc, Some(false))?
1659                .unwrap();
1660            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
1661        }
1662
1663        {
1664            let consensus_text = store.consensus_by_meta(&cmeta)?;
1665            assert_eq!(consensus_text.as_str()?, "Pretend this is a consensus");
1666
1667            let (is, _cmeta2) = store
1668                .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
1669                .unwrap();
1670            assert_eq!(is.as_str()?, "Pretend this is a consensus");
1671
1672            let cmeta3 = ConsensusMeta::new(
1673                netstatus::Lifetime::new(
1674                    now.into(),
1675                    (now + one_hour).into(),
1676                    SystemTime::from(now + one_hour * 2),
1677                )
1678                .unwrap(),
1679                [0x99; 32],
1680                [0x99; 32],
1681            );
1682            assert!(store.consensus_by_meta(&cmeta3).is_err());
1683
1684            assert!(
1685                store
1686                    .consensus_by_sha3_digest_of_signed_part(&[0x99; 32])?
1687                    .is_none()
1688            );
1689        }
1690
1691        {
1692            assert!(
1693                store
1694                    .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
1695                    .is_some()
1696            );
1697            store.delete_consensus(&cmeta)?;
1698            assert!(
1699                store
1700                    .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
1701                    .is_none()
1702            );
1703        }
1704
1705        Ok(())
1706    }
1707
1708    #[test]
1709    fn authcerts() -> Result<()> {
1710        let (_tmp_dir, mut store) = new_empty()?;
1711        let now = now_utc();
1712        let one_hour = 1.hours();
1713
1714        let keyids = AuthCertKeyIds {
1715            id_fingerprint: [3; 20].into(),
1716            sk_fingerprint: [4; 20].into(),
1717        };
1718        let keyids2 = AuthCertKeyIds {
1719            id_fingerprint: [4; 20].into(),
1720            sk_fingerprint: [3; 20].into(),
1721        };
1722
1723        let m1 = AuthCertMeta::new(keyids, now.into(), SystemTime::from(now + one_hour * 24));
1724
1725        store.store_authcerts(&[(m1, "Pretend this is a cert")])?;
1726
1727        let certs = store.authcerts(&[keyids, keyids2])?;
1728        assert_eq!(certs.len(), 1);
1729        assert_eq!(certs.get(&keyids).unwrap(), "Pretend this is a cert");
1730
1731        Ok(())
1732    }
1733
1734    #[test]
1735    fn microdescs() -> Result<()> {
1736        let (_tmp_dir, mut store) = new_empty()?;
1737
1738        let now = now_utc();
1739        let one_day = 1.days();
1740
1741        let d1 = [5_u8; 32];
1742        let d2 = [7; 32];
1743        let d3 = [42; 32];
1744        let d4 = [99; 32];
1745
1746        let long_ago: OffsetDateTime = now - one_day * 100;
1747        store.store_microdescs(
1748            &[
1749                ("Fake micro 1", &d1),
1750                ("Fake micro 2", &d2),
1751                ("Fake micro 3", &d3),
1752            ],
1753            long_ago.into(),
1754        )?;
1755
1756        store.update_microdescs_listed(&[d2], now.into())?;
1757
1758        let mds = store.microdescs(&[d2, d3, d4])?;
1759        assert_eq!(mds.len(), 2);
1760        assert_eq!(mds.get(&d1), None);
1761        assert_eq!(mds.get(&d2).unwrap(), "Fake micro 2");
1762        assert_eq!(mds.get(&d3).unwrap(), "Fake micro 3");
1763        assert_eq!(mds.get(&d4), None);
1764
1765        // Now we'll expire.  that should drop everything but d2.
1766        store.expire_all(&EXPIRATION_DEFAULTS)?;
1767        let mds = store.microdescs(&[d2, d3, d4])?;
1768        assert_eq!(mds.len(), 1);
1769        assert_eq!(mds.get(&d2).unwrap(), "Fake micro 2");
1770
1771        Ok(())
1772    }
1773
1774    #[test]
1775    #[cfg(feature = "routerdesc")]
1776    fn routerdescs() -> Result<()> {
1777        let (_tmp_dir, mut store) = new_empty()?;
1778
1779        let now = now_utc();
1780        let one_day = 1.days();
1781        let long_ago: OffsetDateTime = now - one_day * 100;
1782        let recently = now - one_day;
1783
1784        let d1 = [5_u8; 20];
1785        let d2 = [7; 20];
1786        let d3 = [42; 20];
1787        let d4 = [99; 20];
1788
1789        store.store_routerdescs(&[
1790            ("Fake routerdesc 1", long_ago.into(), &d1),
1791            ("Fake routerdesc 2", recently.into(), &d2),
1792            ("Fake routerdesc 3", long_ago.into(), &d3),
1793        ])?;
1794
1795        let rds = store.routerdescs(&[d2, d3, d4])?;
1796        assert_eq!(rds.len(), 2);
1797        assert_eq!(rds.get(&d1), None);
1798        assert_eq!(rds.get(&d2).unwrap(), "Fake routerdesc 2");
1799        assert_eq!(rds.get(&d3).unwrap(), "Fake routerdesc 3");
1800        assert_eq!(rds.get(&d4), None);
1801
1802        // Now we'll expire.  that should drop everything but d2.
1803        store.expire_all(&EXPIRATION_DEFAULTS)?;
1804        let rds = store.routerdescs(&[d2, d3, d4])?;
1805        assert_eq!(rds.len(), 1);
1806        assert_eq!(rds.get(&d2).unwrap(), "Fake routerdesc 2");
1807
1808        Ok(())
1809    }
1810
1811    #[test]
1812    fn from_path_rw() -> Result<()> {
1813        let tmp = tempdir().unwrap();
1814        let mistrust = fs_mistrust::Mistrust::new_dangerously_trust_everyone();
1815
1816        // Nothing there: can't open read-only
1817        let r = SqliteStore::from_path_and_mistrust(tmp.path(), &mistrust, true);
1818        assert!(r.is_err());
1819        assert!(!tmp.path().join("dir_blobs").try_exists().unwrap());
1820
1821        // Opening it read-write will crate the files
1822        {
1823            let mut store = SqliteStore::from_path_and_mistrust(tmp.path(), &mistrust, false)?;
1824            assert!(tmp.path().join("dir_blobs").is_dir());
1825            assert!(store.lockfile.is_some());
1826            assert!(!store.is_readonly());
1827            assert!(store.upgrade_to_readwrite()?); // no-op.
1828        }
1829
1830        // At this point, we can successfully make a read-only connection.
1831        {
1832            let mut store2 = SqliteStore::from_path_and_mistrust(tmp.path(), &mistrust, true)?;
1833            assert!(store2.is_readonly());
1834
1835            // Nobody else is locking this, so we can upgrade.
1836            assert!(store2.upgrade_to_readwrite()?); // no-op.
1837            assert!(!store2.is_readonly());
1838        }
1839        Ok(())
1840    }
1841
1842    #[test]
1843    fn orphaned_blobs() -> Result<()> {
1844        let (_tmp_dir, mut store) = new_empty()?;
1845        /*
1846        for ent in store.blob_dir.read_directory(".")?.flatten() {
1847            println!("{:?}", ent);
1848        }
1849        */
1850        assert_eq!(store.blob_dir.read_directory(".")?.count(), 0);
1851
1852        let now = now_utc();
1853        let one_week = 1.weeks();
1854        let _fname_good = store.save_blob(
1855            b"Goodbye, dear friends",
1856            "greeting",
1857            "sha1",
1858            &hex!("2149c2a7dbf5be2bb36fb3c5080d0fb14cb3355c"),
1859            now + one_week,
1860        )?;
1861        assert_eq!(store.blob_dir.read_directory(".")?.count(), 1);
1862
1863        // Now, create a two orphaned blobs: one with a recent timestamp, and one with an older
1864        // timestamp.
1865        store
1866            .blob_dir
1867            .write_and_replace("fairly_new", b"new contents will stay")?;
1868        store
1869            .blob_dir
1870            .write_and_replace("fairly_old", b"old contents will be removed")?;
1871        filetime::set_file_mtime(
1872            store.blob_dir.join("fairly_old")?,
1873            SystemTime::from(now - one_week).into(),
1874        )
1875        .expect("Can't adjust mtime");
1876
1877        assert_eq!(store.blob_dir.read_directory(".")?.count(), 3);
1878
1879        store.remove_unreferenced_blobs(now, &EXPIRATION_DEFAULTS)?;
1880        assert_eq!(store.blob_dir.read_directory(".")?.count(), 2);
1881
1882        Ok(())
1883    }
1884
1885    #[test]
1886    fn unreferenced_consensus_blob() -> Result<()> {
1887        let (_tmp_dir, mut store) = new_empty()?;
1888
1889        let now = now_utc();
1890        let one_week = 1.weeks();
1891
1892        // Make a blob that claims to be a consensus, and which has not yet expired, but which is
1893        // not listed in the consensus table.  It should get removed.
1894        let fname = store.save_blob(
1895            b"pretend this is a consensus",
1896            "con_fake",
1897            "sha1",
1898            &hex!("803e5a45eea7766a62a735e051a25a50ffb9b1cf"),
1899            now + one_week,
1900        )?;
1901
1902        assert_eq!(store.blob_dir.read_directory(".")?.count(), 1);
1903        assert_eq!(
1904            &std::fs::read(store.blob_dir.join(&fname)?).unwrap()[..],
1905            b"pretend this is a consensus"
1906        );
1907        let n: u32 = store
1908            .conn
1909            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
1910        assert_eq!(n, 1);
1911
1912        store.expire_all(&EXPIRATION_DEFAULTS)?;
1913        assert_eq!(store.blob_dir.read_directory(".")?.count(), 0);
1914
1915        let n: u32 = store
1916            .conn
1917            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
1918        assert_eq!(n, 0);
1919
1920        Ok(())
1921    }
1922
1923    #[test]
1924    fn vanished_blob_cleanup() -> Result<()> {
1925        let (_tmp_dir, mut store) = new_empty()?;
1926
1927        let now = now_utc();
1928        let one_week = 1.weeks();
1929
1930        // Make a few blobs.
1931        let mut fnames = vec![];
1932        for idx in 0..8 {
1933            let content = format!("Example {idx}");
1934            let digest = Sha3_256::digest(content.as_bytes());
1935            let fname = store.save_blob(
1936                content.as_bytes(),
1937                "blob",
1938                "sha3-256",
1939                digest.as_slice(),
1940                now + one_week,
1941            )?;
1942            fnames.push(fname);
1943        }
1944
1945        // Delete the odd-numbered blobs.
1946        store.blob_dir.remove_file(&fnames[1])?;
1947        store.blob_dir.remove_file(&fnames[3])?;
1948        store.blob_dir.remove_file(&fnames[5])?;
1949        store.blob_dir.remove_file(&fnames[7])?;
1950
1951        let n_removed = {
1952            let tx = store.conn.transaction()?;
1953            let n = SqliteStore::remove_entries_for_vanished_blobs(&store.blob_dir, &tx)?;
1954            tx.commit()?;
1955            n
1956        };
1957        assert_eq!(n_removed, 4);
1958
1959        // Make sure that it was the _odd-numbered_ ones that got deleted from the DB.
1960        let (n_1,): (u32,) =
1961            store
1962                .conn
1963                .query_row(COUNT_EXTDOC_BY_PATH, params![&fnames[1]], |row| {
1964                    row.try_into()
1965                })?;
1966        let (n_2,): (u32,) =
1967            store
1968                .conn
1969                .query_row(COUNT_EXTDOC_BY_PATH, params![&fnames[2]], |row| {
1970                    row.try_into()
1971                })?;
1972        assert_eq!(n_1, 0);
1973        assert_eq!(n_2, 1);
1974        Ok(())
1975    }
1976
1977    #[test]
1978    fn protocol_statuses() -> Result<()> {
1979        let (_tmp_dir, mut store) = new_empty()?;
1980
1981        let now = SystemTime::get();
1982        let hour = 1.hours();
1983
1984        let valid_after = now;
1985        let protocols = serde_json::from_str(
1986            r#"{
1987            "client":{
1988                "required":"Link=5 LinkAuth=3",
1989                "recommended":"Link=1-5 LinkAuth=2-5"
1990            },
1991            "relay":{
1992                "required":"Wombat=20-22 Knish=25-27",
1993                "recommended":"Wombat=20-30 Knish=20-30"
1994            }
1995            }"#,
1996        )
1997        .unwrap();
1998
1999        let v = store.cached_protocol_recommendations()?;
2000        assert!(v.is_none());
2001
2002        store.update_protocol_recommendations(valid_after, &protocols)?;
2003        let v = store.cached_protocol_recommendations()?.unwrap();
2004        assert_eq!(v.0, now);
2005        assert_eq!(
2006            serde_json::to_string(&protocols).unwrap(),
2007            serde_json::to_string(&v.1).unwrap()
2008        );
2009
2010        let protocols2 = serde_json::from_str(
2011            r#"{
2012            "client":{
2013                "required":"Link=5 ",
2014                "recommended":"Link=1-5"
2015            },
2016            "relay":{
2017                "required":"Wombat=20",
2018                "recommended":"Cons=6"
2019            }
2020            }"#,
2021        )
2022        .unwrap();
2023
2024        let valid_after_2 = now + hour;
2025        store.update_protocol_recommendations(valid_after_2, &protocols2)?;
2026
2027        let v = store.cached_protocol_recommendations()?.unwrap();
2028        assert_eq!(v.0, now + hour);
2029        assert_eq!(
2030            serde_json::to_string(&protocols2).unwrap(),
2031            serde_json::to_string(&v.1).unwrap()
2032        );
2033
2034        Ok(())
2035    }
2036}