1use std::sync::{Arc, LazyLock};
9
10use mas_data_model::{Clock, SiteConfig};
11use mas_email::Mailer;
12use mas_matrix::HomeserverConnection;
13use mas_router::UrlBuilder;
14use mas_storage::{BoxRepository, RepositoryError, RepositoryFactory};
15use mas_storage_pg::PgRepositoryFactory;
16use new_queue::QueueRunnerError;
17use opentelemetry::metrics::Meter;
18use rand::SeedableRng;
19use sqlx::{Pool, Postgres};
20use tokio_util::{sync::CancellationToken, task::TaskTracker};
21
22pub use crate::new_queue::QueueWorker;
23
24mod database;
25mod email;
26mod matrix;
27mod new_queue;
28mod recovery;
29mod sessions;
30mod user;
31
32static METER: LazyLock<Meter> = LazyLock::new(|| {
33 let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
34 .with_version(env!("CARGO_PKG_VERSION"))
35 .with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL)
36 .build();
37
38 opentelemetry::global::meter_with_scope(scope)
39});
40
41#[derive(Clone)]
42struct State {
43 repository_factory: PgRepositoryFactory,
44 mailer: Mailer,
45 clock: Arc<dyn Clock>,
46 homeserver: Arc<dyn HomeserverConnection>,
47 url_builder: UrlBuilder,
48 site_config: SiteConfig,
49}
50
51impl State {
52 pub fn new(
53 repository_factory: PgRepositoryFactory,
54 clock: impl Clock + 'static,
55 mailer: Mailer,
56 homeserver: impl HomeserverConnection + 'static,
57 url_builder: UrlBuilder,
58 site_config: SiteConfig,
59 ) -> Self {
60 Self {
61 repository_factory,
62 mailer,
63 clock: Arc::new(clock),
64 homeserver: Arc::new(homeserver),
65 url_builder,
66 site_config,
67 }
68 }
69
70 pub fn pool(&self) -> Pool<Postgres> {
71 self.repository_factory.pool()
72 }
73
74 pub fn clock(&self) -> &dyn Clock {
75 &self.clock
76 }
77
78 pub fn mailer(&self) -> &Mailer {
79 &self.mailer
80 }
81
82 #[allow(clippy::unused_self, clippy::disallowed_methods)]
84 pub fn rng(&self) -> rand_chacha::ChaChaRng {
85 rand_chacha::ChaChaRng::from_rng(rand::thread_rng()).expect("failed to seed rng")
86 }
87
88 pub async fn repository(&self) -> Result<BoxRepository, RepositoryError> {
89 self.repository_factory.create().await
90 }
91
92 pub fn matrix_connection(&self) -> &dyn HomeserverConnection {
93 self.homeserver.as_ref()
94 }
95
96 pub fn url_builder(&self) -> &UrlBuilder {
97 &self.url_builder
98 }
99
100 pub fn site_config(&self) -> &SiteConfig {
101 &self.site_config
102 }
103}
104
105pub async fn init(
113 repository_factory: PgRepositoryFactory,
114 clock: impl Clock + 'static,
115 mailer: &Mailer,
116 homeserver: impl HomeserverConnection + 'static,
117 url_builder: UrlBuilder,
118 site_config: &SiteConfig,
119 cancellation_token: CancellationToken,
120) -> Result<QueueWorker, QueueRunnerError> {
121 let state = State::new(
122 repository_factory,
123 clock,
124 mailer.clone(),
125 homeserver,
126 url_builder,
127 site_config.clone(),
128 );
129 let mut worker = QueueWorker::new(state, cancellation_token).await?;
130
131 worker
132 .register_handler::<mas_storage::queue::CleanupRevokedOAuthAccessTokensJob>()
133 .register_handler::<mas_storage::queue::CleanupExpiredOAuthAccessTokensJob>()
134 .register_handler::<mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob>()
135 .register_handler::<mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob>()
136 .register_handler::<mas_storage::queue::CleanupUserRegistrationsJob>()
137 .register_handler::<mas_storage::queue::CleanupFinishedCompatSessionsJob>()
138 .register_handler::<mas_storage::queue::CleanupFinishedOAuth2SessionsJob>()
139 .register_handler::<mas_storage::queue::CleanupFinishedUserSessionsJob>()
140 .register_handler::<mas_storage::queue::CleanupOAuthAuthorizationGrantsJob>()
141 .register_handler::<mas_storage::queue::CleanupOAuthDeviceCodeGrantsJob>()
142 .register_handler::<mas_storage::queue::CleanupUserRecoverySessionsJob>()
143 .register_handler::<mas_storage::queue::CleanupUserEmailAuthenticationsJob>()
144 .register_handler::<mas_storage::queue::CleanupUpstreamOAuthSessionsJob>()
145 .register_handler::<mas_storage::queue::CleanupUpstreamOAuthLinksJob>()
146 .register_handler::<mas_storage::queue::CleanupQueueJobsJob>()
147 .register_handler::<mas_storage::queue::DeactivateUserJob>()
148 .register_handler::<mas_storage::queue::DeleteDeviceJob>()
149 .register_handler::<mas_storage::queue::ProvisionDeviceJob>()
150 .register_handler::<mas_storage::queue::ProvisionUserJob>()
151 .register_handler::<mas_storage::queue::ReactivateUserJob>()
152 .register_handler::<mas_storage::queue::SendAccountRecoveryEmailsJob>()
153 .register_handler::<mas_storage::queue::SendEmailAuthenticationCodeJob>()
154 .register_handler::<mas_storage::queue::SyncDevicesJob>()
155 .register_handler::<mas_storage::queue::VerifyEmailJob>()
156 .register_handler::<mas_storage::queue::ExpireInactiveSessionsJob>()
157 .register_handler::<mas_storage::queue::ExpireInactiveCompatSessionsJob>()
158 .register_handler::<mas_storage::queue::ExpireInactiveOAuthSessionsJob>()
159 .register_handler::<mas_storage::queue::ExpireInactiveUserSessionsJob>()
160 .register_handler::<mas_storage::queue::PruneStalePolicyDataJob>()
161 .register_handler::<mas_storage::queue::CleanupInactiveOAuth2SessionIpsJob>()
162 .register_handler::<mas_storage::queue::CleanupInactiveCompatSessionIpsJob>()
163 .register_handler::<mas_storage::queue::CleanupInactiveUserSessionIpsJob>()
164 .register_deprecated_queue("cleanup-expired-tokens")
165 .add_schedule(
166 "cleanup-revoked-oauth-access-tokens",
167 "0 0 * * * *".parse()?,
169 mas_storage::queue::CleanupRevokedOAuthAccessTokensJob,
170 )
171 .add_schedule(
172 "cleanup-revoked-oauth-refresh-tokens",
173 "0 10 * * * *".parse()?,
175 mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob,
176 )
177 .add_schedule(
178 "cleanup-consumed-oauth-refresh-tokens",
179 "0 20 * * * *".parse()?,
181 mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob,
182 )
183 .add_schedule(
184 "cleanup-user-registrations",
185 "0 30 * * * *".parse()?,
187 mas_storage::queue::CleanupUserRegistrationsJob,
188 )
189 .add_schedule(
190 "cleanup-finished-compat-sessions",
191 "0 40 * * * *".parse()?,
193 mas_storage::queue::CleanupFinishedCompatSessionsJob,
194 )
195 .add_schedule(
196 "cleanup-finished-oauth2-sessions",
197 "0 42 * * * *".parse()?,
199 mas_storage::queue::CleanupFinishedOAuth2SessionsJob,
200 )
201 .add_schedule(
202 "cleanup-finished-user-sessions",
203 "0 44 * * * *".parse()?,
205 mas_storage::queue::CleanupFinishedUserSessionsJob,
206 )
207 .add_schedule(
208 "cleanup-oauth-authorization-grants",
209 "0 50 * * * *".parse()?,
211 mas_storage::queue::CleanupOAuthAuthorizationGrantsJob,
212 )
213 .add_schedule(
214 "cleanup-oauth-device-code-grants",
215 "0 55 * * * *".parse()?,
217 mas_storage::queue::CleanupOAuthDeviceCodeGrantsJob,
218 )
219 .add_schedule(
220 "cleanup-user-recovery-sessions",
221 "0 56 * * * *".parse()?,
223 mas_storage::queue::CleanupUserRecoverySessionsJob,
224 )
225 .add_schedule(
226 "cleanup-user-email-authentications",
227 "0 57 * * * *".parse()?,
229 mas_storage::queue::CleanupUserEmailAuthenticationsJob,
230 )
231 .add_schedule(
232 "cleanup-upstream-oauth-sessions",
233 "0 58 * * * *".parse()?,
235 mas_storage::queue::CleanupUpstreamOAuthSessionsJob,
236 )
237 .add_schedule(
238 "cleanup-upstream-oauth-links",
239 "0 59 * * * *".parse()?,
241 mas_storage::queue::CleanupUpstreamOAuthLinksJob,
242 )
243 .add_schedule(
244 "cleanup-queue-jobs",
245 "0 45 * * * *".parse()?,
247 mas_storage::queue::CleanupQueueJobsJob,
248 )
249 .add_schedule(
250 "cleanup-expired-oauth-access-tokens",
251 "0 5 */4 * * *".parse()?,
253 mas_storage::queue::CleanupExpiredOAuthAccessTokensJob,
254 )
255 .add_schedule(
256 "expire-inactive-sessions",
257 "30 */15 * * * *".parse()?,
259 mas_storage::queue::ExpireInactiveSessionsJob,
260 )
261 .add_schedule(
262 "prune-stale-policy-data",
263 "0 0 2 * * *".parse()?,
265 mas_storage::queue::PruneStalePolicyDataJob,
266 )
267 .add_schedule(
268 "cleanup-inactive-oauth2-session-ips",
269 "0 46 * * * *".parse()?,
271 mas_storage::queue::CleanupInactiveOAuth2SessionIpsJob,
272 )
273 .add_schedule(
274 "cleanup-inactive-compat-session-ips",
275 "0 47 * * * *".parse()?,
277 mas_storage::queue::CleanupInactiveCompatSessionIpsJob,
278 )
279 .add_schedule(
280 "cleanup-inactive-user-session-ips",
281 "0 48 * * * *".parse()?,
283 mas_storage::queue::CleanupInactiveUserSessionIpsJob,
284 );
285
286 Ok(worker)
287}
288
289#[expect(clippy::too_many_arguments, reason = "this is fine")]
295pub async fn init_and_run(
296 repository_factory: PgRepositoryFactory,
297 clock: impl Clock + 'static,
298 mailer: &Mailer,
299 homeserver: impl HomeserverConnection + 'static,
300 url_builder: UrlBuilder,
301 site_config: &SiteConfig,
302 cancellation_token: CancellationToken,
303 task_tracker: &TaskTracker,
304) -> Result<(), QueueRunnerError> {
305 let worker = init(
306 repository_factory,
307 clock,
308 mailer,
309 homeserver,
310 url_builder,
311 site_config,
312 cancellation_token,
313 )
314 .await?;
315
316 task_tracker.spawn(worker.run());
317
318 Ok(())
319}