1use std::sync::{Arc, LazyLock};
9
10use mas_data_model::{Clock, SiteConfig};
11use mas_email::Mailer;
12use mas_matrix::HomeserverConnection;
13use mas_router::UrlBuilder;
14use mas_storage::{BoxRepository, RepositoryError, RepositoryFactory};
15use mas_storage_pg::PgRepositoryFactory;
16use new_queue::QueueRunnerError;
17use opentelemetry::metrics::Meter;
18use rand::SeedableRng;
19use sqlx::{Pool, Postgres};
20use tokio_util::{sync::CancellationToken, task::TaskTracker};
21
22pub use crate::new_queue::QueueWorker;
23
24mod cleanup;
25mod email;
26mod matrix;
27mod new_queue;
28mod recovery;
29mod sessions;
30mod user;
31
32static METER: LazyLock<Meter> = LazyLock::new(|| {
33 let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
34 .with_version(env!("CARGO_PKG_VERSION"))
35 .with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL)
36 .build();
37
38 opentelemetry::global::meter_with_scope(scope)
39});
40
41#[derive(Clone)]
42struct State {
43 repository_factory: PgRepositoryFactory,
44 mailer: Mailer,
45 clock: Arc<dyn Clock>,
46 homeserver: Arc<dyn HomeserverConnection>,
47 url_builder: UrlBuilder,
48 site_config: SiteConfig,
49}
50
51impl State {
52 pub fn new(
53 repository_factory: PgRepositoryFactory,
54 clock: impl Clock + 'static,
55 mailer: Mailer,
56 homeserver: impl HomeserverConnection + 'static,
57 url_builder: UrlBuilder,
58 site_config: SiteConfig,
59 ) -> Self {
60 Self {
61 repository_factory,
62 mailer,
63 clock: Arc::new(clock),
64 homeserver: Arc::new(homeserver),
65 url_builder,
66 site_config,
67 }
68 }
69
70 pub fn pool(&self) -> Pool<Postgres> {
71 self.repository_factory.pool()
72 }
73
74 pub fn clock(&self) -> &dyn Clock {
75 &self.clock
76 }
77
78 pub fn mailer(&self) -> &Mailer {
79 &self.mailer
80 }
81
82 #[allow(clippy::unused_self, clippy::disallowed_methods)]
84 pub fn rng(&self) -> rand_chacha::ChaChaRng {
85 rand_chacha::ChaChaRng::from_rng(rand::thread_rng()).expect("failed to seed rng")
86 }
87
88 pub async fn repository(&self) -> Result<BoxRepository, RepositoryError> {
89 self.repository_factory.create().await
90 }
91
92 pub fn matrix_connection(&self) -> &dyn HomeserverConnection {
93 self.homeserver.as_ref()
94 }
95
96 pub fn url_builder(&self) -> &UrlBuilder {
97 &self.url_builder
98 }
99
100 pub fn site_config(&self) -> &SiteConfig {
101 &self.site_config
102 }
103}
104
105pub async fn init(
113 repository_factory: PgRepositoryFactory,
114 clock: impl Clock + 'static,
115 mailer: &Mailer,
116 homeserver: impl HomeserverConnection + 'static,
117 url_builder: UrlBuilder,
118 site_config: &SiteConfig,
119 cancellation_token: CancellationToken,
120) -> Result<QueueWorker, QueueRunnerError> {
121 let state = State::new(
122 repository_factory,
123 clock,
124 mailer.clone(),
125 homeserver,
126 url_builder,
127 site_config.clone(),
128 );
129 let mut worker = QueueWorker::new(state, cancellation_token).await?;
130
131 worker
132 .register_handler::<mas_storage::queue::CleanupRevokedOAuthAccessTokensJob>()
133 .register_handler::<mas_storage::queue::CleanupExpiredOAuthAccessTokensJob>()
134 .register_handler::<mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob>()
135 .register_handler::<mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob>()
136 .register_handler::<mas_storage::queue::CleanupUserRegistrationsJob>()
137 .register_handler::<mas_storage::queue::CleanupFinishedCompatSessionsJob>()
138 .register_handler::<mas_storage::queue::CleanupFinishedOAuth2SessionsJob>()
139 .register_handler::<mas_storage::queue::CleanupFinishedUserSessionsJob>()
140 .register_handler::<mas_storage::queue::CleanupOAuthAuthorizationGrantsJob>()
141 .register_handler::<mas_storage::queue::CleanupOAuthDeviceCodeGrantsJob>()
142 .register_handler::<mas_storage::queue::CleanupUserRecoverySessionsJob>()
143 .register_handler::<mas_storage::queue::CleanupUserEmailAuthenticationsJob>()
144 .register_handler::<mas_storage::queue::CleanupUpstreamOAuthSessionsJob>()
145 .register_handler::<mas_storage::queue::CleanupUpstreamOAuthLinksJob>()
146 .register_handler::<mas_storage::queue::CleanupQueueJobsJob>()
147 .register_handler::<mas_storage::queue::DeactivateUserJob>()
148 .register_handler::<mas_storage::queue::DeleteDeviceJob>()
149 .register_handler::<mas_storage::queue::ProvisionDeviceJob>()
150 .register_handler::<mas_storage::queue::ProvisionUserJob>()
151 .register_handler::<mas_storage::queue::ReactivateUserJob>()
152 .register_handler::<mas_storage::queue::SendAccountRecoveryEmailsJob>()
153 .register_handler::<mas_storage::queue::SendEmailAuthenticationCodeJob>()
154 .register_handler::<mas_storage::queue::SyncDevicesJob>()
155 .register_handler::<mas_storage::queue::VerifyEmailJob>()
156 .register_handler::<mas_storage::queue::ExpireInactiveSessionsJob>()
157 .register_handler::<mas_storage::queue::ExpireInactiveCompatSessionsJob>()
158 .register_handler::<mas_storage::queue::ExpireInactiveOAuthSessionsJob>()
159 .register_handler::<mas_storage::queue::ExpireInactiveUserSessionsJob>()
160 .register_handler::<mas_storage::queue::PruneStalePolicyDataJob>()
161 .register_handler::<mas_storage::queue::CleanupInactiveOAuth2SessionIpsJob>()
162 .register_handler::<mas_storage::queue::CleanupInactiveCompatSessionIpsJob>()
163 .register_handler::<mas_storage::queue::CleanupInactiveUserSessionIpsJob>()
164 .register_deprecated_queue("cleanup-expired-tokens")
165 .add_schedule(
168 "cleanup-revoked-oauth-access-tokens",
169 "0 0 * * * *".parse()?,
171 mas_storage::queue::CleanupRevokedOAuthAccessTokensJob,
172 )
173 .add_schedule(
174 "cleanup-revoked-oauth-refresh-tokens",
175 "0 5 * * * *".parse()?,
177 mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob,
178 )
179 .add_schedule(
180 "cleanup-consumed-oauth-refresh-tokens",
181 "0 5 * * * *".parse()?,
183 mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob,
184 )
185 .add_schedule(
186 "cleanup-finished-compat-sessions",
187 "0 10 * * * *".parse()?,
189 mas_storage::queue::CleanupFinishedCompatSessionsJob,
190 )
191 .add_schedule(
192 "cleanup-finished-oauth2-sessions",
193 "0 15 * * * *".parse()?,
195 mas_storage::queue::CleanupFinishedOAuth2SessionsJob,
196 )
197 .add_schedule(
198 "cleanup-finished-user-sessions",
199 "0 20 * * * *".parse()?,
201 mas_storage::queue::CleanupFinishedUserSessionsJob,
202 )
203 .add_schedule(
204 "cleanup-inactive-oauth2-session-ips",
205 "0 25 * * * *".parse()?,
207 mas_storage::queue::CleanupInactiveOAuth2SessionIpsJob,
208 )
209 .add_schedule(
210 "cleanup-inactive-compat-session-ips",
211 "0 25 * * * *".parse()?,
213 mas_storage::queue::CleanupInactiveCompatSessionIpsJob,
214 )
215 .add_schedule(
216 "cleanup-inactive-user-session-ips",
217 "0 25 * * * *".parse()?,
219 mas_storage::queue::CleanupInactiveUserSessionIpsJob,
220 )
221 .add_schedule(
222 "cleanup-oauth-authorization-grants",
223 "0 30 * * * *".parse()?,
225 mas_storage::queue::CleanupOAuthAuthorizationGrantsJob,
226 )
227 .add_schedule(
228 "cleanup-oauth-device-code-grants",
229 "0 35 * * * *".parse()?,
231 mas_storage::queue::CleanupOAuthDeviceCodeGrantsJob,
232 )
233 .add_schedule(
234 "cleanup-upstream-oauth-sessions",
235 "0 40 * * * *".parse()?,
237 mas_storage::queue::CleanupUpstreamOAuthSessionsJob,
238 )
239 .add_schedule(
240 "cleanup-upstream-oauth-links",
241 "0 40 * * * *".parse()?,
243 mas_storage::queue::CleanupUpstreamOAuthLinksJob,
244 )
245 .add_schedule(
247 "cleanup-user-registrations",
248 "0 45 * * * *".parse()?,
250 mas_storage::queue::CleanupUserRegistrationsJob,
251 )
252 .add_schedule(
253 "cleanup-user-recovery-sessions",
254 "0 50 * * * *".parse()?,
256 mas_storage::queue::CleanupUserRecoverySessionsJob,
257 )
258 .add_schedule(
259 "cleanup-user-email-authentications",
260 "0 50 * * * *".parse()?,
262 mas_storage::queue::CleanupUserEmailAuthenticationsJob,
263 )
264 .add_schedule(
265 "cleanup-queue-jobs",
266 "0 55 * * * *".parse()?,
268 mas_storage::queue::CleanupQueueJobsJob,
269 )
270 .add_schedule(
271 "cleanup-expired-oauth-access-tokens",
272 "0 5 */4 * * *".parse()?,
274 mas_storage::queue::CleanupExpiredOAuthAccessTokensJob,
275 )
276 .add_schedule(
277 "expire-inactive-sessions",
278 "30 */15 * * * *".parse()?,
280 mas_storage::queue::ExpireInactiveSessionsJob,
281 )
282 .add_schedule(
283 "prune-stale-policy-data",
284 "0 0 2 * * *".parse()?,
286 mas_storage::queue::PruneStalePolicyDataJob,
287 );
288
289 Ok(worker)
290}
291
292#[expect(clippy::too_many_arguments, reason = "this is fine")]
298pub async fn init_and_run(
299 repository_factory: PgRepositoryFactory,
300 clock: impl Clock + 'static,
301 mailer: &Mailer,
302 homeserver: impl HomeserverConnection + 'static,
303 url_builder: UrlBuilder,
304 site_config: &SiteConfig,
305 cancellation_token: CancellationToken,
306 task_tracker: &TaskTracker,
307) -> Result<(), QueueRunnerError> {
308 let worker = init(
309 repository_factory,
310 clock,
311 mailer,
312 homeserver,
313 url_builder,
314 site_config,
315 cancellation_token,
316 )
317 .await?;
318
319 task_tracker.spawn(worker.run());
320
321 Ok(())
322}