mas_tasks/
lib.rs

1// Copyright 2025, 2026 Element Creations Ltd.
2// Copyright 2024, 2025 New Vector Ltd.
3// Copyright 2021-2024 The Matrix.org Foundation C.I.C.
4//
5// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
6// Please see LICENSE files in the repository root for full details.
7
8use std::sync::{Arc, LazyLock};
9
10use mas_data_model::{Clock, SiteConfig};
11use mas_email::Mailer;
12use mas_matrix::HomeserverConnection;
13use mas_router::UrlBuilder;
14use mas_storage::{BoxRepository, RepositoryError, RepositoryFactory};
15use mas_storage_pg::PgRepositoryFactory;
16use new_queue::QueueRunnerError;
17use opentelemetry::metrics::Meter;
18use rand::SeedableRng;
19use sqlx::{Pool, Postgres};
20use tokio_util::{sync::CancellationToken, task::TaskTracker};
21
22pub use crate::new_queue::QueueWorker;
23
24mod database;
25mod email;
26mod matrix;
27mod new_queue;
28mod recovery;
29mod sessions;
30mod user;
31
32static METER: LazyLock<Meter> = LazyLock::new(|| {
33    let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
34        .with_version(env!("CARGO_PKG_VERSION"))
35        .with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL)
36        .build();
37
38    opentelemetry::global::meter_with_scope(scope)
39});
40
41#[derive(Clone)]
42struct State {
43    repository_factory: PgRepositoryFactory,
44    mailer: Mailer,
45    clock: Arc<dyn Clock>,
46    homeserver: Arc<dyn HomeserverConnection>,
47    url_builder: UrlBuilder,
48    site_config: SiteConfig,
49}
50
51impl State {
52    pub fn new(
53        repository_factory: PgRepositoryFactory,
54        clock: impl Clock + 'static,
55        mailer: Mailer,
56        homeserver: impl HomeserverConnection + 'static,
57        url_builder: UrlBuilder,
58        site_config: SiteConfig,
59    ) -> Self {
60        Self {
61            repository_factory,
62            mailer,
63            clock: Arc::new(clock),
64            homeserver: Arc::new(homeserver),
65            url_builder,
66            site_config,
67        }
68    }
69
70    pub fn pool(&self) -> Pool<Postgres> {
71        self.repository_factory.pool()
72    }
73
74    pub fn clock(&self) -> &dyn Clock {
75        &self.clock
76    }
77
78    pub fn mailer(&self) -> &Mailer {
79        &self.mailer
80    }
81
82    // This is fine for now, we may move that to a trait at some point.
83    #[allow(clippy::unused_self, clippy::disallowed_methods)]
84    pub fn rng(&self) -> rand_chacha::ChaChaRng {
85        rand_chacha::ChaChaRng::from_rng(rand::thread_rng()).expect("failed to seed rng")
86    }
87
88    pub async fn repository(&self) -> Result<BoxRepository, RepositoryError> {
89        self.repository_factory.create().await
90    }
91
92    pub fn matrix_connection(&self) -> &dyn HomeserverConnection {
93        self.homeserver.as_ref()
94    }
95
96    pub fn url_builder(&self) -> &UrlBuilder {
97        &self.url_builder
98    }
99
100    pub fn site_config(&self) -> &SiteConfig {
101        &self.site_config
102    }
103}
104
105/// Initialise the worker, without running it.
106///
107/// This is mostly useful for tests.
108///
109/// # Errors
110///
111/// This function can fail if the database connection fails.
112pub async fn init(
113    repository_factory: PgRepositoryFactory,
114    clock: impl Clock + 'static,
115    mailer: &Mailer,
116    homeserver: impl HomeserverConnection + 'static,
117    url_builder: UrlBuilder,
118    site_config: &SiteConfig,
119    cancellation_token: CancellationToken,
120) -> Result<QueueWorker, QueueRunnerError> {
121    let state = State::new(
122        repository_factory,
123        clock,
124        mailer.clone(),
125        homeserver,
126        url_builder,
127        site_config.clone(),
128    );
129    let mut worker = QueueWorker::new(state, cancellation_token).await?;
130
131    worker
132        .register_handler::<mas_storage::queue::CleanupRevokedOAuthAccessTokensJob>()
133        .register_handler::<mas_storage::queue::CleanupExpiredOAuthAccessTokensJob>()
134        .register_handler::<mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob>()
135        .register_handler::<mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob>()
136        .register_handler::<mas_storage::queue::CleanupUserRegistrationsJob>()
137        .register_handler::<mas_storage::queue::CleanupFinishedCompatSessionsJob>()
138        .register_handler::<mas_storage::queue::CleanupFinishedOAuth2SessionsJob>()
139        .register_handler::<mas_storage::queue::CleanupFinishedUserSessionsJob>()
140        .register_handler::<mas_storage::queue::CleanupOAuthAuthorizationGrantsJob>()
141        .register_handler::<mas_storage::queue::CleanupOAuthDeviceCodeGrantsJob>()
142        .register_handler::<mas_storage::queue::CleanupUserRecoverySessionsJob>()
143        .register_handler::<mas_storage::queue::CleanupUserEmailAuthenticationsJob>()
144        .register_handler::<mas_storage::queue::CleanupUpstreamOAuthSessionsJob>()
145        .register_handler::<mas_storage::queue::CleanupUpstreamOAuthLinksJob>()
146        .register_handler::<mas_storage::queue::CleanupQueueJobsJob>()
147        .register_handler::<mas_storage::queue::DeactivateUserJob>()
148        .register_handler::<mas_storage::queue::DeleteDeviceJob>()
149        .register_handler::<mas_storage::queue::ProvisionDeviceJob>()
150        .register_handler::<mas_storage::queue::ProvisionUserJob>()
151        .register_handler::<mas_storage::queue::ReactivateUserJob>()
152        .register_handler::<mas_storage::queue::SendAccountRecoveryEmailsJob>()
153        .register_handler::<mas_storage::queue::SendEmailAuthenticationCodeJob>()
154        .register_handler::<mas_storage::queue::SyncDevicesJob>()
155        .register_handler::<mas_storage::queue::VerifyEmailJob>()
156        .register_handler::<mas_storage::queue::ExpireInactiveSessionsJob>()
157        .register_handler::<mas_storage::queue::ExpireInactiveCompatSessionsJob>()
158        .register_handler::<mas_storage::queue::ExpireInactiveOAuthSessionsJob>()
159        .register_handler::<mas_storage::queue::ExpireInactiveUserSessionsJob>()
160        .register_handler::<mas_storage::queue::PruneStalePolicyDataJob>()
161        .register_handler::<mas_storage::queue::CleanupInactiveOAuth2SessionIpsJob>()
162        .register_handler::<mas_storage::queue::CleanupInactiveCompatSessionIpsJob>()
163        .register_handler::<mas_storage::queue::CleanupInactiveUserSessionIpsJob>()
164        .register_deprecated_queue("cleanup-expired-tokens")
165        .add_schedule(
166            "cleanup-revoked-oauth-access-tokens",
167            // Run this job every hour
168            "0 0 * * * *".parse()?,
169            mas_storage::queue::CleanupRevokedOAuthAccessTokensJob,
170        )
171        .add_schedule(
172            "cleanup-revoked-oauth-refresh-tokens",
173            // Run this job every hour
174            "0 10 * * * *".parse()?,
175            mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob,
176        )
177        .add_schedule(
178            "cleanup-consumed-oauth-refresh-tokens",
179            // Run this job every hour
180            "0 20 * * * *".parse()?,
181            mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob,
182        )
183        .add_schedule(
184            "cleanup-user-registrations",
185            // Run this job every hour
186            "0 30 * * * *".parse()?,
187            mas_storage::queue::CleanupUserRegistrationsJob,
188        )
189        .add_schedule(
190            "cleanup-finished-compat-sessions",
191            // Run this job every hour
192            "0 40 * * * *".parse()?,
193            mas_storage::queue::CleanupFinishedCompatSessionsJob,
194        )
195        .add_schedule(
196            "cleanup-finished-oauth2-sessions",
197            // Run this job every hour
198            "0 42 * * * *".parse()?,
199            mas_storage::queue::CleanupFinishedOAuth2SessionsJob,
200        )
201        .add_schedule(
202            "cleanup-finished-user-sessions",
203            // Run this job every hour
204            "0 44 * * * *".parse()?,
205            mas_storage::queue::CleanupFinishedUserSessionsJob,
206        )
207        .add_schedule(
208            "cleanup-oauth-authorization-grants",
209            // Run this job every hour
210            "0 50 * * * *".parse()?,
211            mas_storage::queue::CleanupOAuthAuthorizationGrantsJob,
212        )
213        .add_schedule(
214            "cleanup-oauth-device-code-grants",
215            // Run this job every hour
216            "0 55 * * * *".parse()?,
217            mas_storage::queue::CleanupOAuthDeviceCodeGrantsJob,
218        )
219        .add_schedule(
220            "cleanup-user-recovery-sessions",
221            // Run this job every hour
222            "0 56 * * * *".parse()?,
223            mas_storage::queue::CleanupUserRecoverySessionsJob,
224        )
225        .add_schedule(
226            "cleanup-user-email-authentications",
227            // Run this job every hour
228            "0 57 * * * *".parse()?,
229            mas_storage::queue::CleanupUserEmailAuthenticationsJob,
230        )
231        .add_schedule(
232            "cleanup-upstream-oauth-sessions",
233            // Run this job every hour
234            "0 58 * * * *".parse()?,
235            mas_storage::queue::CleanupUpstreamOAuthSessionsJob,
236        )
237        .add_schedule(
238            "cleanup-upstream-oauth-links",
239            // Run this job every hour
240            "0 59 * * * *".parse()?,
241            mas_storage::queue::CleanupUpstreamOAuthLinksJob,
242        )
243        .add_schedule(
244            "cleanup-queue-jobs",
245            // Run this job every hour
246            "0 45 * * * *".parse()?,
247            mas_storage::queue::CleanupQueueJobsJob,
248        )
249        .add_schedule(
250            "cleanup-expired-oauth-access-tokens",
251            // Run this job every 4 hours
252            "0 5 */4 * * *".parse()?,
253            mas_storage::queue::CleanupExpiredOAuthAccessTokensJob,
254        )
255        .add_schedule(
256            "expire-inactive-sessions",
257            // Run this job every 15 minutes
258            "30 */15 * * * *".parse()?,
259            mas_storage::queue::ExpireInactiveSessionsJob,
260        )
261        .add_schedule(
262            "prune-stale-policy-data",
263            // Run once a day
264            "0 0 2 * * *".parse()?,
265            mas_storage::queue::PruneStalePolicyDataJob,
266        )
267        .add_schedule(
268            "cleanup-inactive-oauth2-session-ips",
269            // Run this job every hour
270            "0 46 * * * *".parse()?,
271            mas_storage::queue::CleanupInactiveOAuth2SessionIpsJob,
272        )
273        .add_schedule(
274            "cleanup-inactive-compat-session-ips",
275            // Run this job every hour
276            "0 47 * * * *".parse()?,
277            mas_storage::queue::CleanupInactiveCompatSessionIpsJob,
278        )
279        .add_schedule(
280            "cleanup-inactive-user-session-ips",
281            // Run this job every hour
282            "0 48 * * * *".parse()?,
283            mas_storage::queue::CleanupInactiveUserSessionIpsJob,
284        );
285
286    Ok(worker)
287}
288
289/// Initialise the worker and run it.
290///
291/// # Errors
292///
293/// This function can fail if the database connection fails.
294#[expect(clippy::too_many_arguments, reason = "this is fine")]
295pub async fn init_and_run(
296    repository_factory: PgRepositoryFactory,
297    clock: impl Clock + 'static,
298    mailer: &Mailer,
299    homeserver: impl HomeserverConnection + 'static,
300    url_builder: UrlBuilder,
301    site_config: &SiteConfig,
302    cancellation_token: CancellationToken,
303    task_tracker: &TaskTracker,
304) -> Result<(), QueueRunnerError> {
305    let worker = init(
306        repository_factory,
307        clock,
308        mailer,
309        homeserver,
310        url_builder,
311        site_config,
312        cancellation_token,
313    )
314    .await?;
315
316    task_tracker.spawn(worker.run());
317
318    Ok(())
319}