mas_tasks/
lib.rs

1// Copyright 2024, 2025 New Vector Ltd.
2// Copyright 2021-2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
5// Please see LICENSE files in the repository root for full details.
6
7use std::sync::{Arc, LazyLock};
8
9use mas_data_model::SiteConfig;
10use mas_email::Mailer;
11use mas_matrix::HomeserverConnection;
12use mas_router::UrlBuilder;
13use mas_storage::{BoxRepository, Clock, RepositoryError, RepositoryFactory};
14use mas_storage_pg::PgRepositoryFactory;
15use new_queue::QueueRunnerError;
16use opentelemetry::metrics::Meter;
17use rand::SeedableRng;
18use sqlx::{Pool, Postgres};
19use tokio_util::{sync::CancellationToken, task::TaskTracker};
20
21pub use crate::new_queue::QueueWorker;
22
23mod database;
24mod email;
25mod matrix;
26mod new_queue;
27mod recovery;
28mod sessions;
29mod user;
30
31static METER: LazyLock<Meter> = LazyLock::new(|| {
32    let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
33        .with_version(env!("CARGO_PKG_VERSION"))
34        .with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL)
35        .build();
36
37    opentelemetry::global::meter_with_scope(scope)
38});
39
40#[derive(Clone)]
41struct State {
42    repository_factory: PgRepositoryFactory,
43    mailer: Mailer,
44    clock: Arc<dyn Clock>,
45    homeserver: Arc<dyn HomeserverConnection>,
46    url_builder: UrlBuilder,
47    site_config: SiteConfig,
48}
49
50impl State {
51    pub fn new(
52        repository_factory: PgRepositoryFactory,
53        clock: impl Clock + 'static,
54        mailer: Mailer,
55        homeserver: impl HomeserverConnection + 'static,
56        url_builder: UrlBuilder,
57        site_config: SiteConfig,
58    ) -> Self {
59        Self {
60            repository_factory,
61            mailer,
62            clock: Arc::new(clock),
63            homeserver: Arc::new(homeserver),
64            url_builder,
65            site_config,
66        }
67    }
68
69    pub fn pool(&self) -> Pool<Postgres> {
70        self.repository_factory.pool()
71    }
72
73    pub fn clock(&self) -> &dyn Clock {
74        &self.clock
75    }
76
77    pub fn mailer(&self) -> &Mailer {
78        &self.mailer
79    }
80
81    // This is fine for now, we may move that to a trait at some point.
82    #[allow(clippy::unused_self, clippy::disallowed_methods)]
83    pub fn rng(&self) -> rand_chacha::ChaChaRng {
84        rand_chacha::ChaChaRng::from_rng(rand::thread_rng()).expect("failed to seed rng")
85    }
86
87    pub async fn repository(&self) -> Result<BoxRepository, RepositoryError> {
88        self.repository_factory.create().await
89    }
90
91    pub fn matrix_connection(&self) -> &dyn HomeserverConnection {
92        self.homeserver.as_ref()
93    }
94
95    pub fn url_builder(&self) -> &UrlBuilder {
96        &self.url_builder
97    }
98
99    pub fn site_config(&self) -> &SiteConfig {
100        &self.site_config
101    }
102}
103
104/// Initialise the worker, without running it.
105///
106/// This is mostly useful for tests.
107///
108/// # Errors
109///
110/// This function can fail if the database connection fails.
111pub async fn init(
112    repository_factory: PgRepositoryFactory,
113    clock: impl Clock + 'static,
114    mailer: &Mailer,
115    homeserver: impl HomeserverConnection + 'static,
116    url_builder: UrlBuilder,
117    site_config: &SiteConfig,
118    cancellation_token: CancellationToken,
119) -> Result<QueueWorker, QueueRunnerError> {
120    let state = State::new(
121        repository_factory,
122        clock,
123        mailer.clone(),
124        homeserver,
125        url_builder,
126        site_config.clone(),
127    );
128    let mut worker = QueueWorker::new(state, cancellation_token).await?;
129
130    worker
131        .register_handler::<mas_storage::queue::CleanupExpiredTokensJob>()
132        .register_handler::<mas_storage::queue::DeactivateUserJob>()
133        .register_handler::<mas_storage::queue::DeleteDeviceJob>()
134        .register_handler::<mas_storage::queue::ProvisionDeviceJob>()
135        .register_handler::<mas_storage::queue::ProvisionUserJob>()
136        .register_handler::<mas_storage::queue::ReactivateUserJob>()
137        .register_handler::<mas_storage::queue::SendAccountRecoveryEmailsJob>()
138        .register_handler::<mas_storage::queue::SendEmailAuthenticationCodeJob>()
139        .register_handler::<mas_storage::queue::SyncDevicesJob>()
140        .register_handler::<mas_storage::queue::VerifyEmailJob>()
141        .register_handler::<mas_storage::queue::ExpireInactiveSessionsJob>()
142        .register_handler::<mas_storage::queue::ExpireInactiveCompatSessionsJob>()
143        .register_handler::<mas_storage::queue::ExpireInactiveOAuthSessionsJob>()
144        .register_handler::<mas_storage::queue::ExpireInactiveUserSessionsJob>()
145        .register_handler::<mas_storage::queue::PruneStalePolicyDataJob>()
146        .add_schedule(
147            "cleanup-expired-tokens",
148            "0 0 * * * *".parse()?,
149            mas_storage::queue::CleanupExpiredTokensJob,
150        )
151        .add_schedule(
152            "expire-inactive-sessions",
153            // Run this job every 15 minutes
154            "30 */15 * * * *".parse()?,
155            mas_storage::queue::ExpireInactiveSessionsJob,
156        )
157        .add_schedule(
158            "prune-stale-policy-data",
159            // Run once a day
160            "0 0 2 * * *".parse()?,
161            mas_storage::queue::PruneStalePolicyDataJob,
162        );
163
164    Ok(worker)
165}
166
167/// Initialise the worker and run it.
168///
169/// # Errors
170///
171/// This function can fail if the database connection fails.
172#[expect(clippy::too_many_arguments, reason = "this is fine")]
173pub async fn init_and_run(
174    repository_factory: PgRepositoryFactory,
175    clock: impl Clock + 'static,
176    mailer: &Mailer,
177    homeserver: impl HomeserverConnection + 'static,
178    url_builder: UrlBuilder,
179    site_config: &SiteConfig,
180    cancellation_token: CancellationToken,
181    task_tracker: &TaskTracker,
182) -> Result<(), QueueRunnerError> {
183    let worker = init(
184        repository_factory,
185        clock,
186        mailer,
187        homeserver,
188        url_builder,
189        site_config,
190        cancellation_token,
191    )
192    .await?;
193
194    task_tracker.spawn(worker.run());
195
196    Ok(())
197}