mas_tasks/
recovery.rs

1// Copyright 2024, 2025 New Vector Ltd.
2// Copyright 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
5// Please see LICENSE files in the repository root for full details.
6
7use anyhow::Context;
8use async_trait::async_trait;
9use mas_email::{Address, Mailbox};
10use mas_i18n::DataLocale;
11use mas_storage::{
12    Pagination, RepositoryAccess,
13    queue::SendAccountRecoveryEmailsJob,
14    user::{UserEmailFilter, UserRecoveryRepository},
15};
16use mas_templates::{EmailRecoveryContext, TemplateContext};
17use rand::distributions::{Alphanumeric, DistString};
18use tracing::{error, info};
19
20use crate::{
21    State,
22    new_queue::{JobContext, JobError, RunnableJob},
23};
24
25/// Job to send account recovery emails for a given recovery session.
26#[async_trait]
27impl RunnableJob for SendAccountRecoveryEmailsJob {
28    #[tracing::instrument(
29        name = "job.send_account_recovery_email",
30        fields(
31            user_recovery_session.id = %self.user_recovery_session_id(),
32            user_recovery_session.email,
33        ),
34        skip_all,
35    )]
36    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
37        let clock = state.clock();
38        let mailer = state.mailer();
39        let url_builder = state.url_builder();
40        let mut rng = state.rng();
41        let mut repo = state.repository().await.map_err(JobError::retry)?;
42
43        let session = repo
44            .user_recovery()
45            .lookup_session(self.user_recovery_session_id())
46            .await
47            .map_err(JobError::retry)?
48            .context("User recovery session not found")
49            .map_err(JobError::fail)?;
50
51        tracing::Span::current().record("user_recovery_session.email", &session.email);
52
53        if session.consumed_at.is_some() {
54            info!("Recovery session already consumed, not sending email");
55            return Ok(());
56        }
57
58        let mut cursor = Pagination::first(50);
59
60        let lang: DataLocale = session
61            .locale
62            .parse()
63            .context("Invalid locale in database on recovery session")
64            .map_err(JobError::fail)?;
65
66        loop {
67            let page = repo
68                .user_email()
69                .list(UserEmailFilter::new().for_email(&session.email), cursor)
70                .await
71                .map_err(JobError::retry)?;
72
73            for edge in page.edges {
74                let ticket = Alphanumeric.sample_string(&mut rng, 32);
75
76                let ticket = repo
77                    .user_recovery()
78                    .add_ticket(&mut rng, clock, &session, &edge.node, ticket)
79                    .await
80                    .map_err(JobError::retry)?;
81
82                let user = repo
83                    .user()
84                    .lookup(edge.node.user_id)
85                    .await
86                    .map_err(JobError::retry)?
87                    .context("User not found")
88                    .map_err(JobError::fail)?;
89
90                let url = url_builder.account_recovery_link(ticket.ticket);
91
92                let address: Address = edge.node.email.parse().map_err(JobError::fail)?;
93                let mailbox = Mailbox::new(Some(user.username.clone()), address);
94
95                info!("Sending recovery email to {}", mailbox);
96                let context = EmailRecoveryContext::new(user, session.clone(), url)
97                    .with_language(lang.clone());
98
99                // XXX: we only log if the email fails to send, to avoid stopping the loop
100                if let Err(e) = mailer.send_recovery_email(mailbox, &context).await {
101                    error!(
102                        error = &e as &dyn std::error::Error,
103                        "Failed to send recovery email"
104                    );
105                }
106
107                cursor = cursor.after(edge.cursor);
108            }
109
110            if !page.has_next_page {
111                break;
112            }
113        }
114
115        repo.save().await.map_err(JobError::fail)?;
116
117        Ok(())
118    }
119}