mas_tasks/
lib.rs

1// Copyright 2025, 2026 Element Creations Ltd.
2// Copyright 2024, 2025 New Vector Ltd.
3// Copyright 2021-2024 The Matrix.org Foundation C.I.C.
4//
5// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
6// Please see LICENSE files in the repository root for full details.
7
8use std::sync::{Arc, LazyLock};
9
10use mas_data_model::{Clock, SiteConfig};
11use mas_email::Mailer;
12use mas_matrix::HomeserverConnection;
13use mas_router::UrlBuilder;
14use mas_storage::{BoxRepository, RepositoryError, RepositoryFactory};
15use mas_storage_pg::PgRepositoryFactory;
16use new_queue::QueueRunnerError;
17use opentelemetry::metrics::Meter;
18use rand::SeedableRng;
19use sqlx::{Pool, Postgres};
20use tokio_util::{sync::CancellationToken, task::TaskTracker};
21
22pub use crate::new_queue::QueueWorker;
23
24mod cleanup;
25mod email;
26mod matrix;
27mod new_queue;
28mod recovery;
29mod sessions;
30mod user;
31
32static METER: LazyLock<Meter> = LazyLock::new(|| {
33    let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
34        .with_version(env!("CARGO_PKG_VERSION"))
35        .with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL)
36        .build();
37
38    opentelemetry::global::meter_with_scope(scope)
39});
40
41#[derive(Clone)]
42struct State {
43    repository_factory: PgRepositoryFactory,
44    mailer: Mailer,
45    clock: Arc<dyn Clock>,
46    homeserver: Arc<dyn HomeserverConnection>,
47    url_builder: UrlBuilder,
48    site_config: SiteConfig,
49}
50
51impl State {
52    pub fn new(
53        repository_factory: PgRepositoryFactory,
54        clock: impl Clock + 'static,
55        mailer: Mailer,
56        homeserver: impl HomeserverConnection + 'static,
57        url_builder: UrlBuilder,
58        site_config: SiteConfig,
59    ) -> Self {
60        Self {
61            repository_factory,
62            mailer,
63            clock: Arc::new(clock),
64            homeserver: Arc::new(homeserver),
65            url_builder,
66            site_config,
67        }
68    }
69
70    pub fn pool(&self) -> Pool<Postgres> {
71        self.repository_factory.pool()
72    }
73
74    pub fn clock(&self) -> &dyn Clock {
75        &self.clock
76    }
77
78    pub fn mailer(&self) -> &Mailer {
79        &self.mailer
80    }
81
82    // This is fine for now, we may move that to a trait at some point.
83    #[allow(clippy::unused_self, clippy::disallowed_methods)]
84    pub fn rng(&self) -> rand_chacha::ChaChaRng {
85        rand_chacha::ChaChaRng::from_rng(rand::thread_rng()).expect("failed to seed rng")
86    }
87
88    pub async fn repository(&self) -> Result<BoxRepository, RepositoryError> {
89        self.repository_factory.create().await
90    }
91
92    pub fn matrix_connection(&self) -> &dyn HomeserverConnection {
93        self.homeserver.as_ref()
94    }
95
96    pub fn url_builder(&self) -> &UrlBuilder {
97        &self.url_builder
98    }
99
100    pub fn site_config(&self) -> &SiteConfig {
101        &self.site_config
102    }
103}
104
105/// Initialise the worker, without running it.
106///
107/// This is mostly useful for tests.
108///
109/// # Errors
110///
111/// This function can fail if the database connection fails.
112pub async fn init(
113    repository_factory: PgRepositoryFactory,
114    clock: impl Clock + 'static,
115    mailer: &Mailer,
116    homeserver: impl HomeserverConnection + 'static,
117    url_builder: UrlBuilder,
118    site_config: &SiteConfig,
119    cancellation_token: CancellationToken,
120) -> Result<QueueWorker, QueueRunnerError> {
121    let state = State::new(
122        repository_factory,
123        clock,
124        mailer.clone(),
125        homeserver,
126        url_builder,
127        site_config.clone(),
128    );
129    let mut worker = QueueWorker::new(state, cancellation_token).await?;
130
131    worker
132        .register_handler::<mas_storage::queue::CleanupRevokedOAuthAccessTokensJob>()
133        .register_handler::<mas_storage::queue::CleanupExpiredOAuthAccessTokensJob>()
134        .register_handler::<mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob>()
135        .register_handler::<mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob>()
136        .register_handler::<mas_storage::queue::CleanupUserRegistrationsJob>()
137        .register_handler::<mas_storage::queue::CleanupFinishedCompatSessionsJob>()
138        .register_handler::<mas_storage::queue::CleanupFinishedOAuth2SessionsJob>()
139        .register_handler::<mas_storage::queue::CleanupFinishedUserSessionsJob>()
140        .register_handler::<mas_storage::queue::CleanupOAuthAuthorizationGrantsJob>()
141        .register_handler::<mas_storage::queue::CleanupOAuthDeviceCodeGrantsJob>()
142        .register_handler::<mas_storage::queue::CleanupUserRecoverySessionsJob>()
143        .register_handler::<mas_storage::queue::CleanupUserEmailAuthenticationsJob>()
144        .register_handler::<mas_storage::queue::CleanupUpstreamOAuthSessionsJob>()
145        .register_handler::<mas_storage::queue::CleanupUpstreamOAuthLinksJob>()
146        .register_handler::<mas_storage::queue::CleanupQueueJobsJob>()
147        .register_handler::<mas_storage::queue::DeactivateUserJob>()
148        .register_handler::<mas_storage::queue::DeleteDeviceJob>()
149        .register_handler::<mas_storage::queue::ProvisionDeviceJob>()
150        .register_handler::<mas_storage::queue::ProvisionUserJob>()
151        .register_handler::<mas_storage::queue::ReactivateUserJob>()
152        .register_handler::<mas_storage::queue::SendAccountRecoveryEmailsJob>()
153        .register_handler::<mas_storage::queue::SendEmailAuthenticationCodeJob>()
154        .register_handler::<mas_storage::queue::SyncDevicesJob>()
155        .register_handler::<mas_storage::queue::VerifyEmailJob>()
156        .register_handler::<mas_storage::queue::ExpireInactiveSessionsJob>()
157        .register_handler::<mas_storage::queue::ExpireInactiveCompatSessionsJob>()
158        .register_handler::<mas_storage::queue::ExpireInactiveOAuthSessionsJob>()
159        .register_handler::<mas_storage::queue::ExpireInactiveUserSessionsJob>()
160        .register_handler::<mas_storage::queue::PruneStalePolicyDataJob>()
161        .register_handler::<mas_storage::queue::CleanupInactiveOAuth2SessionIpsJob>()
162        .register_handler::<mas_storage::queue::CleanupInactiveCompatSessionIpsJob>()
163        .register_handler::<mas_storage::queue::CleanupInactiveUserSessionIpsJob>()
164        .register_deprecated_queue("cleanup-expired-tokens")
165        // Recurring jobs are spread across the hour at ~5 minute intervals
166        // to avoid clustering and distribute database load evenly.
167        .add_schedule(
168            "cleanup-revoked-oauth-access-tokens",
169            // Run this job every hour at minute 0
170            "0 0 * * * *".parse()?,
171            mas_storage::queue::CleanupRevokedOAuthAccessTokensJob,
172        )
173        .add_schedule(
174            "cleanup-revoked-oauth-refresh-tokens",
175            // Run this job every hour at minute 5
176            "0 5 * * * *".parse()?,
177            mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob,
178        )
179        .add_schedule(
180            "cleanup-consumed-oauth-refresh-tokens",
181            // Run this job every hour at minute 5 (safe to parallelize with revoked)
182            "0 5 * * * *".parse()?,
183            mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob,
184        )
185        .add_schedule(
186            "cleanup-finished-compat-sessions",
187            // Run this job every hour at minute 10
188            "0 10 * * * *".parse()?,
189            mas_storage::queue::CleanupFinishedCompatSessionsJob,
190        )
191        .add_schedule(
192            "cleanup-finished-oauth2-sessions",
193            // Run this job every hour at minute 15
194            "0 15 * * * *".parse()?,
195            mas_storage::queue::CleanupFinishedOAuth2SessionsJob,
196        )
197        .add_schedule(
198            "cleanup-finished-user-sessions",
199            // Run this job every hour at minute 20
200            "0 20 * * * *".parse()?,
201            mas_storage::queue::CleanupFinishedUserSessionsJob,
202        )
203        .add_schedule(
204            "cleanup-inactive-oauth2-session-ips",
205            // Run this job every hour at minute 25
206            "0 25 * * * *".parse()?,
207            mas_storage::queue::CleanupInactiveOAuth2SessionIpsJob,
208        )
209        .add_schedule(
210            "cleanup-inactive-compat-session-ips",
211            // Run this job every hour at minute 25
212            "0 25 * * * *".parse()?,
213            mas_storage::queue::CleanupInactiveCompatSessionIpsJob,
214        )
215        .add_schedule(
216            "cleanup-inactive-user-session-ips",
217            // Run this job every hour at minute 25
218            "0 25 * * * *".parse()?,
219            mas_storage::queue::CleanupInactiveUserSessionIpsJob,
220        )
221        .add_schedule(
222            "cleanup-oauth-authorization-grants",
223            // Run this job every hour at minute 30
224            "0 30 * * * *".parse()?,
225            mas_storage::queue::CleanupOAuthAuthorizationGrantsJob,
226        )
227        .add_schedule(
228            "cleanup-oauth-device-code-grants",
229            // Run this job every hour at minute 35
230            "0 35 * * * *".parse()?,
231            mas_storage::queue::CleanupOAuthDeviceCodeGrantsJob,
232        )
233        .add_schedule(
234            "cleanup-upstream-oauth-sessions",
235            // Run this job every hour at minute 40 (independent, safe to parallelize)
236            "0 40 * * * *".parse()?,
237            mas_storage::queue::CleanupUpstreamOAuthSessionsJob,
238        )
239        .add_schedule(
240            "cleanup-upstream-oauth-links",
241            // Run this job every hour at minute 40
242            "0 40 * * * *".parse()?,
243            mas_storage::queue::CleanupUpstreamOAuthLinksJob,
244        )
245        // User cleanup jobs (minutes 45, 50)
246        .add_schedule(
247            "cleanup-user-registrations",
248            // Run this job every hour at minute 45
249            "0 45 * * * *".parse()?,
250            mas_storage::queue::CleanupUserRegistrationsJob,
251        )
252        .add_schedule(
253            "cleanup-user-recovery-sessions",
254            // Run this job every hour at minute 50
255            "0 50 * * * *".parse()?,
256            mas_storage::queue::CleanupUserRecoverySessionsJob,
257        )
258        .add_schedule(
259            "cleanup-user-email-authentications",
260            // Run this job every hour at minute 50
261            "0 50 * * * *".parse()?,
262            mas_storage::queue::CleanupUserEmailAuthenticationsJob,
263        )
264        .add_schedule(
265            "cleanup-queue-jobs",
266            // Run this job every hour at minute 55
267            "0 55 * * * *".parse()?,
268            mas_storage::queue::CleanupQueueJobsJob,
269        )
270        .add_schedule(
271            "cleanup-expired-oauth-access-tokens",
272            // Run this job every 4 hours at minute 5
273            "0 5 */4 * * *".parse()?,
274            mas_storage::queue::CleanupExpiredOAuthAccessTokensJob,
275        )
276        .add_schedule(
277            "expire-inactive-sessions",
278            // Run this job every 15 minutes at second 30
279            "30 */15 * * * *".parse()?,
280            mas_storage::queue::ExpireInactiveSessionsJob,
281        )
282        .add_schedule(
283            "prune-stale-policy-data",
284            // Run once a day at 2:00 AM
285            "0 0 2 * * *".parse()?,
286            mas_storage::queue::PruneStalePolicyDataJob,
287        );
288
289    Ok(worker)
290}
291
292/// Initialise the worker and run it.
293///
294/// # Errors
295///
296/// This function can fail if the database connection fails.
297#[expect(clippy::too_many_arguments, reason = "this is fine")]
298pub async fn init_and_run(
299    repository_factory: PgRepositoryFactory,
300    clock: impl Clock + 'static,
301    mailer: &Mailer,
302    homeserver: impl HomeserverConnection + 'static,
303    url_builder: UrlBuilder,
304    site_config: &SiteConfig,
305    cancellation_token: CancellationToken,
306    task_tracker: &TaskTracker,
307) -> Result<(), QueueRunnerError> {
308    let worker = init(
309        repository_factory,
310        clock,
311        mailer,
312        homeserver,
313        url_builder,
314        site_config,
315        cancellation_token,
316    )
317    .await?;
318
319    task_tracker.spawn(worker.run());
320
321    Ok(())
322}