mas_storage/queue/
tasks.rs

1// Copyright 2025, 2026 Element Creations Ltd.
2// Copyright 2024, 2025 New Vector Ltd.
3//
4// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
5// Please see LICENSE files in the repository root for full details.
6
7use chrono::{DateTime, Utc};
8use mas_data_model::{
9    BrowserSession, CompatSession, Device, Session, User, UserEmailAuthentication,
10    UserRecoverySession,
11};
12use serde::{Deserialize, Serialize};
13use ulid::Ulid;
14
15use super::InsertableJob;
16use crate::{Page, Pagination};
17
18/// This is the previous iteration of the email verification job. It has been
19/// replaced by [`SendEmailAuthenticationCodeJob`]. This struct is kept to be
20/// able to consume jobs that are still in the queue.
21#[derive(Serialize, Deserialize, Debug, Clone)]
22pub struct VerifyEmailJob {
23    user_email_id: Ulid,
24    language: Option<String>,
25}
26
27impl VerifyEmailJob {
28    /// The ID of the email address to verify.
29    #[must_use]
30    pub fn user_email_id(&self) -> Ulid {
31        self.user_email_id
32    }
33}
34
35impl InsertableJob for VerifyEmailJob {
36    const QUEUE_NAME: &'static str = "verify-email";
37}
38
39/// A job to send an email authentication code to a user.
40#[derive(Serialize, Deserialize, Debug, Clone)]
41pub struct SendEmailAuthenticationCodeJob {
42    user_email_authentication_id: Ulid,
43    language: String,
44}
45
46impl SendEmailAuthenticationCodeJob {
47    /// Create a new job to send an email authentication code to a user.
48    #[must_use]
49    pub fn new(user_email_authentication: &UserEmailAuthentication, language: String) -> Self {
50        Self {
51            user_email_authentication_id: user_email_authentication.id,
52            language,
53        }
54    }
55
56    /// The language to use for the email.
57    #[must_use]
58    pub fn language(&self) -> &str {
59        &self.language
60    }
61
62    /// The ID of the email authentication to send the code for.
63    #[must_use]
64    pub fn user_email_authentication_id(&self) -> Ulid {
65        self.user_email_authentication_id
66    }
67}
68
69impl InsertableJob for SendEmailAuthenticationCodeJob {
70    const QUEUE_NAME: &'static str = "send-email-authentication-code";
71}
72
73/// A job to provision the user on the homeserver.
74#[derive(Serialize, Deserialize, Debug, Clone)]
75pub struct ProvisionUserJob {
76    user_id: Ulid,
77    set_display_name: Option<String>,
78}
79
80impl ProvisionUserJob {
81    /// Create a new job to provision the user on the homeserver.
82    #[must_use]
83    pub fn new(user: &User) -> Self {
84        Self {
85            user_id: user.id,
86            set_display_name: None,
87        }
88    }
89
90    #[doc(hidden)]
91    #[must_use]
92    pub fn new_for_id(user_id: Ulid) -> Self {
93        Self {
94            user_id,
95            set_display_name: None,
96        }
97    }
98
99    /// Set the display name of the user.
100    #[must_use]
101    pub fn set_display_name(mut self, display_name: String) -> Self {
102        self.set_display_name = Some(display_name);
103        self
104    }
105
106    /// Get the display name to be set.
107    #[must_use]
108    pub fn display_name_to_set(&self) -> Option<&str> {
109        self.set_display_name.as_deref()
110    }
111
112    /// The ID of the user to provision.
113    #[must_use]
114    pub fn user_id(&self) -> Ulid {
115        self.user_id
116    }
117}
118
119impl InsertableJob for ProvisionUserJob {
120    const QUEUE_NAME: &'static str = "provision-user";
121}
122
123/// A job to provision a device for a user on the homeserver.
124///
125/// This job is deprecated, use the `SyncDevicesJob` instead. It is kept to
126/// not break existing jobs in the database.
127#[derive(Serialize, Deserialize, Debug, Clone)]
128pub struct ProvisionDeviceJob {
129    user_id: Ulid,
130    device_id: String,
131}
132
133impl ProvisionDeviceJob {
134    /// The ID of the user to provision the device for.
135    #[must_use]
136    pub fn user_id(&self) -> Ulid {
137        self.user_id
138    }
139
140    /// The ID of the device to provision.
141    #[must_use]
142    pub fn device_id(&self) -> &str {
143        &self.device_id
144    }
145}
146
147impl InsertableJob for ProvisionDeviceJob {
148    const QUEUE_NAME: &'static str = "provision-device";
149}
150
151/// A job to delete a device for a user on the homeserver.
152///
153/// This job is deprecated, use the `SyncDevicesJob` instead. It is kept to
154/// not break existing jobs in the database.
155#[derive(Serialize, Deserialize, Debug, Clone)]
156pub struct DeleteDeviceJob {
157    user_id: Ulid,
158    device_id: String,
159}
160
161impl DeleteDeviceJob {
162    /// Create a new job to delete a device for a user on the homeserver.
163    #[must_use]
164    pub fn new(user: &User, device: &Device) -> Self {
165        Self {
166            user_id: user.id,
167            device_id: device.as_str().to_owned(),
168        }
169    }
170
171    /// The ID of the user to delete the device for.
172    #[must_use]
173    pub fn user_id(&self) -> Ulid {
174        self.user_id
175    }
176
177    /// The ID of the device to delete.
178    #[must_use]
179    pub fn device_id(&self) -> &str {
180        &self.device_id
181    }
182}
183
184impl InsertableJob for DeleteDeviceJob {
185    const QUEUE_NAME: &'static str = "delete-device";
186}
187
188/// A job which syncs the list of devices of a user with the homeserver
189#[derive(Serialize, Deserialize, Debug, Clone)]
190pub struct SyncDevicesJob {
191    user_id: Ulid,
192}
193
194impl SyncDevicesJob {
195    /// Create a new job to sync the list of devices of a user with the
196    /// homeserver
197    #[must_use]
198    pub fn new(user: &User) -> Self {
199        Self { user_id: user.id }
200    }
201
202    /// Create a new job to sync the list of devices of a user with the
203    /// homeserver for the given user ID
204    ///
205    /// This is useful to use in cases where the [`User`] object isn't loaded
206    #[must_use]
207    pub fn new_for_id(user_id: Ulid) -> Self {
208        Self { user_id }
209    }
210
211    /// The ID of the user to sync the devices for
212    #[must_use]
213    pub fn user_id(&self) -> Ulid {
214        self.user_id
215    }
216}
217
218impl InsertableJob for SyncDevicesJob {
219    const QUEUE_NAME: &'static str = "sync-devices";
220}
221
222/// A job to deactivate and lock a user
223#[derive(Serialize, Deserialize, Debug, Clone)]
224pub struct DeactivateUserJob {
225    user_id: Ulid,
226    hs_erase: bool,
227}
228
229impl DeactivateUserJob {
230    /// Create a new job to deactivate and lock a user
231    ///
232    /// # Parameters
233    ///
234    /// * `user` - The user to deactivate
235    /// * `hs_erase` - Whether to erase the user from the homeserver
236    #[must_use]
237    pub fn new(user: &User, hs_erase: bool) -> Self {
238        Self {
239            user_id: user.id,
240            hs_erase,
241        }
242    }
243
244    /// The ID of the user to deactivate
245    #[must_use]
246    pub fn user_id(&self) -> Ulid {
247        self.user_id
248    }
249
250    /// Whether to erase the user from the homeserver
251    #[must_use]
252    pub fn hs_erase(&self) -> bool {
253        self.hs_erase
254    }
255}
256
257impl InsertableJob for DeactivateUserJob {
258    const QUEUE_NAME: &'static str = "deactivate-user";
259}
260
261/// A job to reactivate a user
262#[derive(Serialize, Deserialize, Debug, Clone)]
263pub struct ReactivateUserJob {
264    user_id: Ulid,
265}
266
267impl ReactivateUserJob {
268    /// Create a new job to reactivate a user
269    ///
270    /// # Parameters
271    ///
272    /// * `user` - The user to reactivate
273    #[must_use]
274    pub fn new(user: &User) -> Self {
275        Self { user_id: user.id }
276    }
277
278    /// The ID of the user to reactivate
279    #[must_use]
280    pub fn user_id(&self) -> Ulid {
281        self.user_id
282    }
283}
284
285impl InsertableJob for ReactivateUserJob {
286    const QUEUE_NAME: &'static str = "reactivate-user";
287}
288
289/// Send account recovery emails
290#[derive(Serialize, Deserialize, Debug, Clone)]
291pub struct SendAccountRecoveryEmailsJob {
292    user_recovery_session_id: Ulid,
293}
294
295impl SendAccountRecoveryEmailsJob {
296    /// Create a new job to send account recovery emails
297    ///
298    /// # Parameters
299    ///
300    /// * `user_recovery_session` - The user recovery session to send the email
301    ///   for
302    /// * `language` - The locale to send the email in
303    #[must_use]
304    pub fn new(user_recovery_session: &UserRecoverySession) -> Self {
305        Self {
306            user_recovery_session_id: user_recovery_session.id,
307        }
308    }
309
310    /// The ID of the user recovery session to send the email for
311    #[must_use]
312    pub fn user_recovery_session_id(&self) -> Ulid {
313        self.user_recovery_session_id
314    }
315}
316
317impl InsertableJob for SendAccountRecoveryEmailsJob {
318    const QUEUE_NAME: &'static str = "send-account-recovery-email";
319}
320
321/// Cleanup revoked OAuth 2.0 access tokens
322#[derive(Serialize, Deserialize, Debug, Clone, Default)]
323pub struct CleanupRevokedOAuthAccessTokensJob;
324
325impl InsertableJob for CleanupRevokedOAuthAccessTokensJob {
326    const QUEUE_NAME: &'static str = "cleanup-revoked-oauth-access-tokens";
327}
328
329/// Cleanup expired OAuth 2.0 access tokens
330#[derive(Serialize, Deserialize, Debug, Clone, Default)]
331pub struct CleanupExpiredOAuthAccessTokensJob;
332
333impl InsertableJob for CleanupExpiredOAuthAccessTokensJob {
334    const QUEUE_NAME: &'static str = "cleanup-expired-oauth-access-tokens";
335}
336
337/// Cleanup revoked OAuth 2.0 refresh tokens
338#[derive(Serialize, Deserialize, Debug, Clone, Default)]
339pub struct CleanupRevokedOAuthRefreshTokensJob;
340
341impl InsertableJob for CleanupRevokedOAuthRefreshTokensJob {
342    const QUEUE_NAME: &'static str = "cleanup-revoked-oauth-refresh-tokens";
343}
344
345/// Cleanup consumed OAuth 2.0 refresh tokens
346#[derive(Serialize, Deserialize, Debug, Clone, Default)]
347pub struct CleanupConsumedOAuthRefreshTokensJob;
348
349impl InsertableJob for CleanupConsumedOAuthRefreshTokensJob {
350    const QUEUE_NAME: &'static str = "cleanup-consumed-oauth-refresh-tokens";
351}
352
353/// Cleanup old user registrations
354#[derive(Serialize, Deserialize, Debug, Clone, Default)]
355pub struct CleanupUserRegistrationsJob;
356
357impl InsertableJob for CleanupUserRegistrationsJob {
358    const QUEUE_NAME: &'static str = "cleanup-user-registrations";
359}
360
361/// Cleanup finished compat sessions
362#[derive(Serialize, Deserialize, Debug, Clone, Default)]
363pub struct CleanupFinishedCompatSessionsJob;
364
365impl InsertableJob for CleanupFinishedCompatSessionsJob {
366    const QUEUE_NAME: &'static str = "cleanup-finished-compat-sessions";
367}
368
369/// Cleanup finished OAuth 2.0 sessions
370#[derive(Serialize, Deserialize, Debug, Clone, Default)]
371pub struct CleanupFinishedOAuth2SessionsJob;
372
373impl InsertableJob for CleanupFinishedOAuth2SessionsJob {
374    const QUEUE_NAME: &'static str = "cleanup-finished-oauth2-sessions";
375}
376
377/// Cleanup finished user/browser sessions
378#[derive(Serialize, Deserialize, Debug, Clone, Default)]
379pub struct CleanupFinishedUserSessionsJob;
380
381impl InsertableJob for CleanupFinishedUserSessionsJob {
382    const QUEUE_NAME: &'static str = "cleanup-finished-user-sessions";
383}
384
385/// Cleanup old OAuth 2.0 authorization grants
386#[derive(Serialize, Deserialize, Debug, Clone, Default)]
387pub struct CleanupOAuthAuthorizationGrantsJob;
388
389impl InsertableJob for CleanupOAuthAuthorizationGrantsJob {
390    const QUEUE_NAME: &'static str = "cleanup-oauth-authorization-grants";
391}
392
393/// Cleanup old OAuth 2.0 device code grants
394#[derive(Serialize, Deserialize, Debug, Clone, Default)]
395pub struct CleanupOAuthDeviceCodeGrantsJob;
396
397impl InsertableJob for CleanupOAuthDeviceCodeGrantsJob {
398    const QUEUE_NAME: &'static str = "cleanup-oauth-device-code-grants";
399}
400
401/// Cleanup old user recovery sessions
402#[derive(Serialize, Deserialize, Debug, Clone, Default)]
403pub struct CleanupUserRecoverySessionsJob;
404
405impl InsertableJob for CleanupUserRecoverySessionsJob {
406    const QUEUE_NAME: &'static str = "cleanup-user-recovery-sessions";
407}
408
409/// Cleanup old user email authentications
410#[derive(Serialize, Deserialize, Debug, Clone, Default)]
411pub struct CleanupUserEmailAuthenticationsJob;
412
413impl InsertableJob for CleanupUserEmailAuthenticationsJob {
414    const QUEUE_NAME: &'static str = "cleanup-user-email-authentications";
415}
416
417/// Cleanup old pending upstream OAuth authorization sessions
418#[derive(Serialize, Deserialize, Debug, Clone, Default)]
419pub struct CleanupUpstreamOAuthSessionsJob;
420
421impl InsertableJob for CleanupUpstreamOAuthSessionsJob {
422    const QUEUE_NAME: &'static str = "cleanup-upstream-oauth-sessions";
423}
424
425/// Cleanup orphaned upstream OAuth links
426#[derive(Serialize, Deserialize, Debug, Clone, Default)]
427pub struct CleanupUpstreamOAuthLinksJob;
428
429impl InsertableJob for CleanupUpstreamOAuthLinksJob {
430    const QUEUE_NAME: &'static str = "cleanup-upstream-oauth-links";
431}
432
433/// Cleanup old completed and failed queue jobs
434#[derive(Serialize, Deserialize, Debug, Clone, Default)]
435pub struct CleanupQueueJobsJob;
436
437impl InsertableJob for CleanupQueueJobsJob {
438    const QUEUE_NAME: &'static str = "cleanup-queue-jobs";
439}
440
441/// Scheduled job to expire inactive sessions
442///
443/// This job will trigger jobs to expire inactive compat, oauth and user
444/// sessions.
445#[derive(Serialize, Deserialize, Debug, Clone)]
446pub struct ExpireInactiveSessionsJob;
447
448impl InsertableJob for ExpireInactiveSessionsJob {
449    const QUEUE_NAME: &'static str = "expire-inactive-sessions";
450}
451
452/// Expire inactive OAuth 2.0 sessions
453#[derive(Serialize, Deserialize, Debug, Clone)]
454pub struct ExpireInactiveOAuthSessionsJob {
455    threshold: DateTime<Utc>,
456    after: Option<Ulid>,
457}
458
459impl ExpireInactiveOAuthSessionsJob {
460    /// Create a new job to expire inactive OAuth 2.0 sessions
461    ///
462    /// # Parameters
463    ///
464    /// * `threshold` - The threshold to expire sessions at
465    #[must_use]
466    pub fn new(threshold: DateTime<Utc>) -> Self {
467        Self {
468            threshold,
469            after: None,
470        }
471    }
472
473    /// Get the threshold to expire sessions at
474    #[must_use]
475    pub fn threshold(&self) -> DateTime<Utc> {
476        self.threshold
477    }
478
479    /// Get the pagination cursor
480    #[must_use]
481    pub fn pagination(&self, batch_size: usize) -> Pagination {
482        let pagination = Pagination::first(batch_size);
483        if let Some(after) = self.after {
484            pagination.after(after)
485        } else {
486            pagination
487        }
488    }
489
490    /// Get the next job given the page returned by the database
491    #[must_use]
492    pub fn next(&self, page: &Page<Session>) -> Option<Self> {
493        if !page.has_next_page {
494            return None;
495        }
496
497        let last_edge = page.edges.last()?;
498        Some(Self {
499            threshold: self.threshold,
500            after: Some(last_edge.cursor),
501        })
502    }
503}
504
505impl InsertableJob for ExpireInactiveOAuthSessionsJob {
506    const QUEUE_NAME: &'static str = "expire-inactive-oauth-sessions";
507}
508
509/// Expire inactive compatibility sessions
510#[derive(Serialize, Deserialize, Debug, Clone)]
511pub struct ExpireInactiveCompatSessionsJob {
512    threshold: DateTime<Utc>,
513    after: Option<Ulid>,
514}
515
516impl ExpireInactiveCompatSessionsJob {
517    /// Create a new job to expire inactive compatibility sessions
518    ///
519    /// # Parameters
520    ///
521    /// * `threshold` - The threshold to expire sessions at
522    #[must_use]
523    pub fn new(threshold: DateTime<Utc>) -> Self {
524        Self {
525            threshold,
526            after: None,
527        }
528    }
529
530    /// Get the threshold to expire sessions at
531    #[must_use]
532    pub fn threshold(&self) -> DateTime<Utc> {
533        self.threshold
534    }
535
536    /// Get the pagination cursor
537    #[must_use]
538    pub fn pagination(&self, batch_size: usize) -> Pagination {
539        let pagination = Pagination::first(batch_size);
540        if let Some(after) = self.after {
541            pagination.after(after)
542        } else {
543            pagination
544        }
545    }
546
547    /// Get the next job given the page returned by the database
548    #[must_use]
549    pub fn next(&self, page: &Page<CompatSession>) -> Option<Self> {
550        if !page.has_next_page {
551            return None;
552        }
553
554        let last_edge = page.edges.last()?;
555        Some(Self {
556            threshold: self.threshold,
557            after: Some(last_edge.cursor),
558        })
559    }
560}
561
562impl InsertableJob for ExpireInactiveCompatSessionsJob {
563    const QUEUE_NAME: &'static str = "expire-inactive-compat-sessions";
564}
565
566/// Expire inactive user sessions
567#[derive(Debug, Serialize, Deserialize)]
568pub struct ExpireInactiveUserSessionsJob {
569    threshold: DateTime<Utc>,
570    after: Option<Ulid>,
571}
572
573impl ExpireInactiveUserSessionsJob {
574    /// Create a new job to expire inactive user/browser sessions
575    ///
576    /// # Parameters
577    ///
578    /// * `threshold` - The threshold to expire sessions at
579    #[must_use]
580    pub fn new(threshold: DateTime<Utc>) -> Self {
581        Self {
582            threshold,
583            after: None,
584        }
585    }
586
587    /// Get the threshold to expire sessions at
588    #[must_use]
589    pub fn threshold(&self) -> DateTime<Utc> {
590        self.threshold
591    }
592
593    /// Get the pagination cursor
594    #[must_use]
595    pub fn pagination(&self, batch_size: usize) -> Pagination {
596        let pagination = Pagination::first(batch_size);
597        if let Some(after) = self.after {
598            pagination.after(after)
599        } else {
600            pagination
601        }
602    }
603
604    /// Get the next job given the page returned by the database
605    #[must_use]
606    pub fn next(&self, page: &Page<BrowserSession>) -> Option<Self> {
607        if !page.has_next_page {
608            return None;
609        }
610
611        let last_edge = page.edges.last()?;
612        Some(Self {
613            threshold: self.threshold,
614            after: Some(last_edge.cursor),
615        })
616    }
617}
618
619impl InsertableJob for ExpireInactiveUserSessionsJob {
620    const QUEUE_NAME: &'static str = "expire-inactive-user-sessions";
621}
622
623/// Prune stale policy data
624#[derive(Debug, Serialize, Deserialize)]
625pub struct PruneStalePolicyDataJob;
626
627impl InsertableJob for PruneStalePolicyDataJob {
628    const QUEUE_NAME: &'static str = "prune-stale-policy-data";
629}
630
631/// Cleanup IP addresses from inactive OAuth 2.0 sessions
632#[derive(Serialize, Deserialize, Debug, Clone, Default)]
633pub struct CleanupInactiveOAuth2SessionIpsJob;
634
635impl InsertableJob for CleanupInactiveOAuth2SessionIpsJob {
636    const QUEUE_NAME: &'static str = "cleanup-inactive-oauth2-session-ips";
637}
638
639/// Cleanup IP addresses from inactive compat sessions
640#[derive(Serialize, Deserialize, Debug, Clone, Default)]
641pub struct CleanupInactiveCompatSessionIpsJob;
642
643impl InsertableJob for CleanupInactiveCompatSessionIpsJob {
644    const QUEUE_NAME: &'static str = "cleanup-inactive-compat-session-ips";
645}
646
647/// Cleanup IP addresses from inactive user/browser sessions
648#[derive(Serialize, Deserialize, Debug, Clone, Default)]
649pub struct CleanupInactiveUserSessionIpsJob;
650
651impl InsertableJob for CleanupInactiveUserSessionIpsJob {
652    const QUEUE_NAME: &'static str = "cleanup-inactive-user-session-ips";
653}