mas_tasks/
email.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// Copyright 2024, 2025 New Vector Ltd.
// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.

use async_trait::async_trait;
use chrono::Duration;
use mas_email::{Address, EmailVerificationContext, Mailbox};
use mas_storage::queue::{SendEmailAuthenticationCodeJob, VerifyEmailJob};
use mas_templates::TemplateContext as _;
use rand::{Rng, distributions::Uniform};
use tracing::info;

use crate::{
    State,
    new_queue::{JobContext, JobError, RunnableJob},
};

#[async_trait]
impl RunnableJob for VerifyEmailJob {
    #[tracing::instrument(
        name = "job.verify_email",
        fields(user_email.id = %self.user_email_id()),
        skip_all,
        err,
    )]
    async fn run(&self, _state: &State, _context: JobContext) -> Result<(), JobError> {
        // This job was for the old email verification flow, which has been replaced.
        // We still want to consume existing jobs in the queue, so we just make them
        // permanently fail.
        Err(JobError::fail(anyhow::anyhow!("Not implemented")))
    }
}

#[async_trait]
impl RunnableJob for SendEmailAuthenticationCodeJob {
    #[tracing::instrument(
        name = "job.send_email_authentication_code",
        fields(user_email_authentication.id = %self.user_email_authentication_id()),
        skip_all,
        err,
    )]
    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
        let clock = state.clock();
        let mailer = state.mailer();
        let mut rng = state.rng();
        let mut repo = state.repository().await.map_err(JobError::retry)?;

        let user_email_authentication = repo
            .user_email()
            .lookup_authentication(self.user_email_authentication_id())
            .await
            .map_err(JobError::retry)?
            .ok_or(JobError::fail(anyhow::anyhow!(
                "User email authentication not found"
            )))?;

        if user_email_authentication.completed_at.is_some() {
            return Err(JobError::fail(anyhow::anyhow!(
                "User email authentication already completed"
            )));
        }

        // Load the browser session, if any
        let browser_session =
            if let Some(browser_session) = user_email_authentication.user_session_id {
                Some(
                    repo.browser_session()
                        .lookup(browser_session)
                        .await
                        .map_err(JobError::retry)?
                        .ok_or(JobError::fail(anyhow::anyhow!(
                            "Failed to load browser session"
                        )))?,
                )
            } else {
                None
            };

        // Load the registration, if any
        let registration =
            if let Some(registration_id) = user_email_authentication.user_registration_id {
                Some(
                    repo.user_registration()
                        .lookup(registration_id)
                        .await
                        .map_err(JobError::retry)?
                        .ok_or(JobError::fail(anyhow::anyhow!(
                            "Failed to load user registration"
                        )))?,
                )
            } else {
                None
            };

        // Generate a new 6-digit authentication code
        let range = Uniform::<u32>::from(0..1_000_000);
        let code = rng.sample(range);
        let code = format!("{code:06}");
        let code = repo
            .user_email()
            .add_authentication_code(
                &mut rng,
                &clock,
                Duration::minutes(5), // TODO: make this configurable
                &user_email_authentication,
                code,
            )
            .await
            .map_err(JobError::retry)?;

        let address: Address = user_email_authentication
            .email
            .parse()
            .map_err(JobError::fail)?;
        let username_from_session = browser_session.as_ref().map(|s| s.user.username.clone());
        let username_from_registration = registration.as_ref().map(|r| r.username.clone());
        let username = username_from_registration.or(username_from_session);
        let mailbox = Mailbox::new(username, address);

        info!("Sending email verification code to {}", mailbox);

        let language = self.language().parse().map_err(JobError::fail)?;

        let context = EmailVerificationContext::new(code, browser_session, registration)
            .with_language(language);
        mailer
            .send_verification_email(mailbox, &context)
            .await
            .map_err(JobError::fail)?;

        repo.save().await.map_err(JobError::fail)?;

        Ok(())
    }
}