1use std::time::Instant;
15
16use chrono::{DateTime, Utc};
17use compact_str::CompactString;
18use futures_util::{SinkExt, StreamExt as _, TryFutureExt, TryStreamExt as _};
19use mas_data_model::Clock;
20use rand::{RngCore, SeedableRng};
21use thiserror::Error;
22use thiserror_ext::ContextInto;
23use tokio_util::sync::PollSender;
24use tracing::{Instrument as _, Level, info};
25use ulid::Ulid;
26use uuid::{NonNilUuid, Uuid};
27
28use crate::{
29 HashMap, ProgressCounter, RandomState, SynapseReader,
30 mas_writer::{
31 self, MasNewCompatAccessToken, MasNewCompatRefreshToken, MasNewCompatSession,
32 MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
33 MasNewUserPassword, MasWriteBuffer, MasWriter,
34 },
35 progress::{EntityType, Progress},
36 synapse_reader::{
37 self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice,
38 SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser,
39 },
40};
41
42#[derive(Debug, Error, ContextInto)]
43pub enum Error {
44 #[error("error when reading synapse DB ({context}): {source}")]
45 Synapse {
46 source: synapse_reader::Error,
47 context: String,
48 },
49 #[error("error when writing to MAS DB ({context}): {source}")]
50 Mas {
51 source: mas_writer::Error,
52 context: String,
53 },
54 #[error("failed to extract localpart of {user:?}: {source}")]
55 ExtractLocalpart {
56 source: ExtractLocalpartError,
57 user: FullUserId,
58 },
59 #[error("channel closed")]
60 ChannelClosed,
61
62 #[error("task failed ({context}): {source}")]
63 Join {
64 source: tokio::task::JoinError,
65 context: String,
66 },
67
68 #[error("user {user} was not found for migration but a row in {table} was found for them")]
69 MissingUserFromDependentTable { table: String, user: FullUserId },
70 #[error(
71 "missing a mapping for the auth provider with ID {synapse_id:?} (used by {user} and maybe other users)"
72 )]
73 MissingAuthProviderMapping {
74 synapse_id: String,
77 user: FullUserId,
79 },
80}
81
82bitflags::bitflags! {
83 #[derive(Debug, Clone, Copy)]
84 struct UserFlags: u8 {
85 const IS_SYNAPSE_ADMIN = 0b0000_0001;
86 const IS_DEACTIVATED = 0b0000_0010;
87 const IS_GUEST = 0b0000_0100;
88 const IS_APPSERVICE = 0b0000_1000;
89 }
90}
91
92impl UserFlags {
93 const fn is_deactivated(self) -> bool {
94 self.contains(UserFlags::IS_DEACTIVATED)
95 }
96
97 const fn is_guest(self) -> bool {
98 self.contains(UserFlags::IS_GUEST)
99 }
100
101 const fn is_synapse_admin(self) -> bool {
102 self.contains(UserFlags::IS_SYNAPSE_ADMIN)
103 }
104
105 const fn is_appservice(self) -> bool {
106 self.contains(UserFlags::IS_APPSERVICE)
107 }
108}
109
110#[derive(Debug, Clone, Copy)]
111struct UserInfo {
112 mas_user_id: Option<NonNilUuid>,
113 flags: UserFlags,
114}
115
116struct MigrationState {
117 server_name: String,
119
120 users: HashMap<CompactString, UserInfo>,
122
123 devices_to_compat_sessions: HashMap<(NonNilUuid, CompactString), Uuid>,
125
126 provider_id_mapping: std::collections::HashMap<String, Uuid>,
129}
130
131#[expect(clippy::implicit_hasher)]
144#[allow(clippy::too_many_arguments)]
145pub async fn migrate(
146 mut synapse: SynapseReader<'_>,
147 mas: MasWriter,
148 server_name: String,
149 clock: &dyn Clock,
150 rng: &mut impl RngCore,
151 provider_id_mapping: std::collections::HashMap<String, Uuid>,
152 progress: &Progress,
153 ignore_missing_auth_providers: bool,
154) -> Result<(), Error> {
155 let counts = synapse.count_rows().await.into_synapse("counting users")?;
156
157 let state = MigrationState {
158 server_name,
159 users: HashMap::with_capacity_and_hasher(counts.users * 9 / 8, RandomState::default()),
162 devices_to_compat_sessions: HashMap::with_capacity_and_hasher(
163 counts.devices * 9 / 8,
164 RandomState::default(),
165 ),
166 provider_id_mapping,
167 };
168
169 let progress_counter = progress.migrating_data(EntityType::Users, counts.users);
170 let (mas, state) = migrate_users(&mut synapse, mas, state, rng, progress_counter).await?;
171
172 let progress_counter = progress.migrating_data(EntityType::ThreePids, counts.threepids);
173 let (mas, state) = migrate_threepids(&mut synapse, mas, rng, state, progress_counter).await?;
174
175 let progress_counter = progress.migrating_data(EntityType::ExternalIds, counts.external_ids);
176 let (mas, state) = migrate_external_ids(
177 &mut synapse,
178 mas,
179 rng,
180 state,
181 progress_counter,
182 ignore_missing_auth_providers,
183 )
184 .await?;
185
186 let progress_counter = progress.migrating_data(
187 EntityType::NonRefreshableAccessTokens,
188 counts.access_tokens - counts.refresh_tokens,
189 );
190 let (mas, state) =
191 migrate_unrefreshable_access_tokens(&mut synapse, mas, clock, rng, state, progress_counter)
192 .await?;
193
194 let progress_counter =
195 progress.migrating_data(EntityType::RefreshableTokens, counts.refresh_tokens);
196 let (mas, state) =
197 migrate_refreshable_token_pairs(&mut synapse, mas, clock, rng, state, progress_counter)
198 .await?;
199
200 let progress_counter = progress.migrating_data(EntityType::Devices, counts.devices);
201 let (mas, _state) = migrate_devices(&mut synapse, mas, rng, state, progress_counter).await?;
202
203 synapse
204 .finish()
205 .await
206 .into_synapse("failed to close Synapse reader")?;
207
208 mas.finish(progress)
209 .await
210 .into_mas("failed to finalise MAS database")?;
211
212 Ok(())
213}
214
215#[tracing::instrument(skip_all, level = Level::INFO)]
216async fn migrate_users(
217 synapse: &mut SynapseReader<'_>,
218 mut mas: MasWriter,
219 mut state: MigrationState,
220 rng: &mut impl RngCore,
221 progress_counter: ProgressCounter,
222) -> Result<(MasWriter, MigrationState), Error> {
223 let start = Instant::now();
224 let progress_counter_ = progress_counter.clone();
225
226 let (tx, mut rx) = tokio::sync::mpsc::channel::<SynapseUser>(100 * 1024);
227
228 let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
231 let task = tokio::spawn(
232 async move {
233 let mut user_buffer = MasWriteBuffer::new(&mas);
234 let mut password_buffer = MasWriteBuffer::new(&mas);
235
236 while let Some(user) = rx.recv().await {
237 if user.appservice_id.is_some()
240 && user
241 .name
242 .0
243 .strip_suffix(&format!(":{}", state.server_name))
244 .is_some_and(|localpart| localpart.contains(':'))
245 {
246 tracing::warn!("AS user {} has invalid localpart, ignoring!", user.name.0);
247 continue;
248 }
249
250 let (mas_user, mas_password_opt) =
251 transform_user(&user, &state.server_name, &mut rng)?;
252
253 let mut flags = UserFlags::empty();
254 if bool::from(user.admin) {
255 flags |= UserFlags::IS_SYNAPSE_ADMIN;
256 }
257 if bool::from(user.deactivated) {
258 flags |= UserFlags::IS_DEACTIVATED;
259 }
260 if bool::from(user.is_guest) {
261 flags |= UserFlags::IS_GUEST;
262 }
263 if user.appservice_id.is_some() {
264 flags |= UserFlags::IS_APPSERVICE;
265
266 progress_counter.increment_skipped();
267
268 state.users.insert(
271 CompactString::new(&mas_user.username),
272 UserInfo {
273 mas_user_id: None,
274 flags,
275 },
276 );
277 continue;
278 }
279
280 state.users.insert(
281 CompactString::new(&mas_user.username),
282 UserInfo {
283 mas_user_id: Some(mas_user.user_id),
284 flags,
285 },
286 );
287
288 user_buffer
289 .write(&mut mas, mas_user)
290 .await
291 .into_mas("writing user")?;
292
293 if let Some(mas_password) = mas_password_opt {
294 password_buffer
295 .write(&mut mas, mas_password)
296 .await
297 .into_mas("writing password")?;
298 }
299
300 progress_counter.increment_migrated();
301 }
302
303 user_buffer
304 .finish(&mut mas)
305 .await
306 .into_mas("writing users")?;
307 password_buffer
308 .finish(&mut mas)
309 .await
310 .into_mas("writing passwords")?;
311
312 Ok((mas, state))
313 }
314 .instrument(tracing::info_span!("ingest_task")),
315 );
316
317 let res = synapse
320 .read_users()
321 .map_err(|e| e.into_synapse("reading users"))
322 .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
323 .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
324 .await;
325
326 let (mas, state) = task.await.into_join("user write task")??;
327
328 res?;
329
330 info!(
331 "{} users migrated ({} skipped) in {:.1}s",
332 progress_counter_.migrated(),
333 progress_counter_.skipped(),
334 Instant::now().duration_since(start).as_secs_f64()
335 );
336
337 Ok((mas, state))
338}
339
340#[tracing::instrument(skip_all, level = Level::INFO)]
341async fn migrate_threepids(
342 synapse: &mut SynapseReader<'_>,
343 mut mas: MasWriter,
344 rng: &mut impl RngCore,
345 state: MigrationState,
346 progress_counter: ProgressCounter,
347) -> Result<(MasWriter, MigrationState), Error> {
348 let start = Instant::now();
349 let progress_counter_ = progress_counter.clone();
350
351 let (tx, mut rx) = tokio::sync::mpsc::channel::<SynapseThreepid>(100 * 1024);
352
353 let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
356 let task = tokio::spawn(
357 async move {
358 let mut email_buffer = MasWriteBuffer::new(&mas);
359 let mut unsupported_buffer = MasWriteBuffer::new(&mas);
360
361 while let Some(threepid) = rx.recv().await {
362 let SynapseThreepid {
363 user_id: synapse_user_id,
364 medium,
365 address,
366 added_at,
367 } = threepid;
368 let created_at: DateTime<Utc> = added_at.into();
369
370 let username = synapse_user_id
371 .extract_localpart(&state.server_name)
372 .into_extract_localpart(synapse_user_id.clone())?
373 .to_owned();
374 let Some(user_infos) = state.users.get(username.as_str()).copied() else {
375 return Err(Error::MissingUserFromDependentTable {
376 table: "user_threepids".to_owned(),
377 user: synapse_user_id,
378 });
379 };
380
381 let Some(mas_user_id) = user_infos.mas_user_id else {
382 progress_counter.increment_skipped();
383 continue;
384 };
385
386 if medium == "email" {
387 email_buffer
388 .write(
389 &mut mas,
390 MasNewEmailThreepid {
391 user_id: mas_user_id,
392 user_email_id: Uuid::from(Ulid::from_datetime_with_source(
393 created_at.into(),
394 &mut rng,
395 )),
396 email: address,
397 created_at,
398 },
399 )
400 .await
401 .into_mas("writing email")?;
402 } else {
403 unsupported_buffer
404 .write(
405 &mut mas,
406 MasNewUnsupportedThreepid {
407 user_id: mas_user_id,
408 medium,
409 address,
410 created_at,
411 },
412 )
413 .await
414 .into_mas("writing unsupported threepid")?;
415 }
416
417 progress_counter.increment_migrated();
418 }
419
420 email_buffer
421 .finish(&mut mas)
422 .await
423 .into_mas("writing email threepids")?;
424 unsupported_buffer
425 .finish(&mut mas)
426 .await
427 .into_mas("writing unsupported threepids")?;
428
429 Ok((mas, state))
430 }
431 .instrument(tracing::info_span!("ingest_task")),
432 );
433
434 let res = synapse
437 .read_threepids()
438 .map_err(|e| e.into_synapse("reading threepids"))
439 .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
440 .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
441 .await;
442
443 let (mas, state) = task.await.into_join("threepid write task")??;
444
445 res?;
446
447 info!(
448 "{} third-party IDs migrated ({} skipped) in {:.1}s",
449 progress_counter_.migrated(),
450 progress_counter_.skipped(),
451 Instant::now().duration_since(start).as_secs_f64()
452 );
453
454 Ok((mas, state))
455}
456
457#[tracing::instrument(skip_all, level = Level::INFO)]
458async fn migrate_external_ids(
459 synapse: &mut SynapseReader<'_>,
460 mut mas: MasWriter,
461 rng: &mut impl RngCore,
462 state: MigrationState,
463 progress_counter: ProgressCounter,
464 ignore_missing_auth_providers: bool,
465) -> Result<(MasWriter, MigrationState), Error> {
466 let start = Instant::now();
467 let progress_counter_ = progress_counter.clone();
468
469 let (tx, mut rx) = tokio::sync::mpsc::channel::<SynapseExternalId>(100 * 1024);
470
471 let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
474 let task = tokio::spawn(
475 async move {
476 let mut write_buffer = MasWriteBuffer::new(&mas);
477
478 while let Some(extid) = rx.recv().await {
479 let SynapseExternalId {
480 user_id: synapse_user_id,
481 auth_provider,
482 external_id: subject,
483 } = extid;
484 let username = synapse_user_id
485 .extract_localpart(&state.server_name)
486 .into_extract_localpart(synapse_user_id.clone())?
487 .to_owned();
488 let Some(user_infos) = state.users.get(username.as_str()).copied() else {
489 return Err(Error::MissingUserFromDependentTable {
490 table: "user_external_ids".to_owned(),
491 user: synapse_user_id,
492 });
493 };
494
495 let Some(mas_user_id) = user_infos.mas_user_id else {
496 progress_counter.increment_skipped();
497 continue;
498 };
499
500 let Some(&upstream_provider_id) = state.provider_id_mapping.get(&auth_provider)
501 else {
502 if ignore_missing_auth_providers {
503 progress_counter.increment_skipped();
504 continue;
505 }
506 return Err(Error::MissingAuthProviderMapping {
507 synapse_id: auth_provider,
508 user: synapse_user_id,
509 });
510 };
511
512 let user_created_ts = Ulid::from(mas_user_id.get()).datetime();
515
516 let link_id: Uuid =
517 Ulid::from_datetime_with_source(user_created_ts, &mut rng).into();
518
519 write_buffer
520 .write(
521 &mut mas,
522 MasNewUpstreamOauthLink {
523 link_id,
524 user_id: mas_user_id,
525 upstream_provider_id,
526 subject,
527 created_at: user_created_ts.into(),
528 },
529 )
530 .await
531 .into_mas("failed to write upstream link")?;
532
533 progress_counter.increment_migrated();
534 }
535
536 write_buffer
537 .finish(&mut mas)
538 .await
539 .into_mas("writing upstream links")?;
540
541 Ok((mas, state))
542 }
543 .instrument(tracing::info_span!("ingest_task")),
544 );
545
546 let res = synapse
549 .read_user_external_ids()
550 .map_err(|e| e.into_synapse("reading external ID"))
551 .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
552 .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
553 .await;
554
555 let (mas, state) = task.await.into_join("external IDs write task")??;
556
557 res?;
558
559 info!(
560 "{} upstream links (external IDs) migrated ({} skipped) in {:.1}s",
561 progress_counter_.migrated(),
562 progress_counter_.skipped(),
563 Instant::now().duration_since(start).as_secs_f64()
564 );
565
566 Ok((mas, state))
567}
568
569#[tracing::instrument(skip_all, level = Level::INFO)]
578async fn migrate_devices(
579 synapse: &mut SynapseReader<'_>,
580 mut mas: MasWriter,
581 rng: &mut impl RngCore,
582 mut state: MigrationState,
583 progress_counter: ProgressCounter,
584) -> Result<(MasWriter, MigrationState), Error> {
585 let start = Instant::now();
586 let progress_counter_ = progress_counter.clone();
587
588 let (tx, mut rx) = tokio::sync::mpsc::channel(100 * 1024);
589
590 let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
593 let task = tokio::spawn(
594 async move {
595 let mut write_buffer = MasWriteBuffer::new(&mas);
596
597 while let Some(device) = rx.recv().await {
598 let SynapseDevice {
599 user_id: synapse_user_id,
600 device_id,
601 display_name,
602 last_seen,
603 ip,
604 user_agent,
605 } = device;
606 let username = synapse_user_id
607 .extract_localpart(&state.server_name)
608 .into_extract_localpart(synapse_user_id.clone())?
609 .to_owned();
610 let Some(user_infos) = state.users.get(username.as_str()).copied() else {
611 return Err(Error::MissingUserFromDependentTable {
612 table: "devices".to_owned(),
613 user: synapse_user_id,
614 });
615 };
616
617 let Some(mas_user_id) = user_infos.mas_user_id else {
618 progress_counter.increment_skipped();
619 continue;
620 };
621
622 if user_infos.flags.is_deactivated()
623 || user_infos.flags.is_guest()
624 || user_infos.flags.is_appservice()
625 {
626 continue;
627 }
628
629 let session_id = *state
630 .devices_to_compat_sessions
631 .entry((mas_user_id, CompactString::new(&device_id)))
632 .or_insert_with(||
633 Ulid::with_source(&mut rng).into());
636 let created_at = Ulid::from(session_id).datetime().into();
637
638 let last_active_ip = ip.filter(|ip| ip != "-").and_then(|ip| {
644 ip.parse()
645 .map_err(|e| {
646 tracing::warn!(
647 error = &e as &dyn std::error::Error,
648 mxid = %synapse_user_id,
649 %device_id,
650 %ip,
651 "Failed to parse device IP, ignoring"
652 );
653 })
654 .ok()
655 });
656
657 write_buffer
658 .write(
659 &mut mas,
660 MasNewCompatSession {
661 session_id,
662 user_id: mas_user_id,
663 device_id: Some(device_id),
664 human_name: display_name,
665 created_at,
666 is_synapse_admin: user_infos.flags.is_synapse_admin(),
667 last_active_at: last_seen.map(DateTime::from),
668 last_active_ip,
669 user_agent,
670 },
671 )
672 .await
673 .into_mas("writing compat sessions")?;
674
675 progress_counter.increment_migrated();
676 }
677
678 write_buffer
679 .finish(&mut mas)
680 .await
681 .into_mas("writing compat sessions")?;
682
683 Ok((mas, state))
684 }
685 .instrument(tracing::info_span!("ingest_task")),
686 );
687
688 let res = synapse
691 .read_devices()
692 .map_err(|e| e.into_synapse("reading devices"))
693 .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
694 .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
695 .await;
696
697 let (mas, state) = task.await.into_join("device write task")??;
698
699 res?;
700
701 info!(
702 "{} devices migrated ({} skipped) in {:.1}s",
703 progress_counter_.migrated(),
704 progress_counter_.skipped(),
705 Instant::now().duration_since(start).as_secs_f64()
706 );
707
708 Ok((mas, state))
709}
710
711#[tracing::instrument(skip_all, level = Level::INFO)]
714async fn migrate_unrefreshable_access_tokens(
715 synapse: &mut SynapseReader<'_>,
716 mut mas: MasWriter,
717 clock: &dyn Clock,
718 rng: &mut impl RngCore,
719 mut state: MigrationState,
720 progress_counter: ProgressCounter,
721) -> Result<(MasWriter, MigrationState), Error> {
722 let start = Instant::now();
723 let progress_counter_ = progress_counter.clone();
724
725 let (tx, mut rx) = tokio::sync::mpsc::channel(100 * 1024);
726
727 let now = clock.now();
728 let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
731 let task = tokio::spawn(
732 async move {
733 let mut write_buffer = MasWriteBuffer::new(&mas);
734 let mut deviceless_session_write_buffer = MasWriteBuffer::new(&mas);
735
736 while let Some(token) = rx.recv().await {
737 let SynapseAccessToken {
738 user_id: synapse_user_id,
739 device_id,
740 token,
741 valid_until_ms,
742 last_validated,
743 } = token;
744 let username = synapse_user_id
745 .extract_localpart(&state.server_name)
746 .into_extract_localpart(synapse_user_id.clone())?
747 .to_owned();
748 let Some(user_infos) = state.users.get(username.as_str()).copied() else {
749 return Err(Error::MissingUserFromDependentTable {
750 table: "access_tokens".to_owned(),
751 user: synapse_user_id,
752 });
753 };
754
755 let Some(mas_user_id) = user_infos.mas_user_id else {
756 progress_counter.increment_skipped();
757 continue;
758 };
759
760 if user_infos.flags.is_deactivated()
761 || user_infos.flags.is_guest()
762 || user_infos.flags.is_appservice()
763 {
764 progress_counter.increment_skipped();
765 continue;
766 }
767
768 let created_at = last_validated.map_or_else(|| now, DateTime::from);
772
773 let session_id = if let Some(device_id) = device_id {
774 *state
776 .devices_to_compat_sessions
777 .entry((mas_user_id, CompactString::new(&device_id)))
778 .or_insert_with(|| {
779 Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng))
780 })
781 } else {
782 let deviceless_session_id =
785 Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng));
786
787 deviceless_session_write_buffer
788 .write(
789 &mut mas,
790 MasNewCompatSession {
791 session_id: deviceless_session_id,
792 user_id: mas_user_id,
793 device_id: None,
794 human_name: None,
795 created_at,
796 is_synapse_admin: false,
797 last_active_at: None,
798 last_active_ip: None,
799 user_agent: None,
800 },
801 )
802 .await
803 .into_mas("failed to write deviceless compat sessions")?;
804
805 deviceless_session_id
806 };
807
808 let token_id =
809 Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng));
810
811 write_buffer
812 .write(
813 &mut mas,
814 MasNewCompatAccessToken {
815 token_id,
816 session_id,
817 access_token: token,
818 created_at,
819 expires_at: valid_until_ms.map(DateTime::from),
820 },
821 )
822 .await
823 .into_mas("writing compat access tokens")?;
824
825 progress_counter.increment_migrated();
826 }
827 write_buffer
828 .finish(&mut mas)
829 .await
830 .into_mas("writing compat access tokens")?;
831 deviceless_session_write_buffer
832 .finish(&mut mas)
833 .await
834 .into_mas("writing deviceless compat sessions")?;
835
836 Ok((mas, state))
837 }
838 .instrument(tracing::info_span!("ingest_task")),
839 );
840
841 let res = synapse
844 .read_unrefreshable_access_tokens()
845 .map_err(|e| e.into_synapse("reading tokens"))
846 .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
847 .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
848 .await;
849
850 let (mas, state) = task.await.into_join("token write task")??;
851
852 res?;
853
854 info!(
855 "{} non-refreshable access tokens migrated ({} skipped) in {:.1}s",
856 progress_counter_.migrated(),
857 progress_counter_.skipped(),
858 Instant::now().duration_since(start).as_secs_f64()
859 );
860
861 Ok((mas, state))
862}
863
864#[tracing::instrument(skip_all, level = Level::INFO)]
867async fn migrate_refreshable_token_pairs(
868 synapse: &mut SynapseReader<'_>,
869 mut mas: MasWriter,
870 clock: &dyn Clock,
871 rng: &mut impl RngCore,
872 mut state: MigrationState,
873 progress_counter: ProgressCounter,
874) -> Result<(MasWriter, MigrationState), Error> {
875 let start = Instant::now();
876 let progress_counter_ = progress_counter.clone();
877
878 let (tx, mut rx) = tokio::sync::mpsc::channel::<SynapseRefreshableTokenPair>(100 * 1024);
879
880 let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
883 let now = clock.now();
884 let task = tokio::spawn(
885 async move {
886 let mut access_token_write_buffer = MasWriteBuffer::new(&mas);
887 let mut refresh_token_write_buffer = MasWriteBuffer::new(&mas);
888
889 while let Some(token) = rx.recv().await {
890 let SynapseRefreshableTokenPair {
891 user_id: synapse_user_id,
892 device_id,
893 access_token,
894 refresh_token,
895 valid_until_ms,
896 last_validated,
897 } = token;
898
899 let username = synapse_user_id
900 .extract_localpart(&state.server_name)
901 .into_extract_localpart(synapse_user_id.clone())?
902 .to_owned();
903 let Some(user_infos) = state.users.get(username.as_str()).copied() else {
904 return Err(Error::MissingUserFromDependentTable {
905 table: "refresh_tokens".to_owned(),
906 user: synapse_user_id,
907 });
908 };
909
910 let Some(mas_user_id) = user_infos.mas_user_id else {
911 progress_counter.increment_skipped();
912 continue;
913 };
914
915 if user_infos.flags.is_deactivated()
916 || user_infos.flags.is_guest()
917 || user_infos.flags.is_appservice()
918 {
919 progress_counter.increment_skipped();
920 continue;
921 }
922
923 let created_at = last_validated.map_or_else(|| now, DateTime::from);
927
928 let session_id = *state
930 .devices_to_compat_sessions
931 .entry((mas_user_id, CompactString::new(&device_id)))
932 .or_insert_with(|| {
933 Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng))
934 });
935
936 let access_token_id =
937 Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng));
938 let refresh_token_id =
939 Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng));
940
941 access_token_write_buffer
942 .write(
943 &mut mas,
944 MasNewCompatAccessToken {
945 token_id: access_token_id,
946 session_id,
947 access_token,
948 created_at,
949 expires_at: valid_until_ms.map(DateTime::from),
950 },
951 )
952 .await
953 .into_mas("writing compat access tokens")?;
954 refresh_token_write_buffer
955 .write(
956 &mut mas,
957 MasNewCompatRefreshToken {
958 refresh_token_id,
959 session_id,
960 access_token_id,
961 refresh_token,
962 created_at,
963 },
964 )
965 .await
966 .into_mas("writing compat refresh tokens")?;
967
968 progress_counter.increment_migrated();
969 }
970
971 access_token_write_buffer
972 .finish(&mut mas)
973 .await
974 .into_mas("writing compat access tokens")?;
975
976 refresh_token_write_buffer
977 .finish(&mut mas)
978 .await
979 .into_mas("writing compat refresh tokens")?;
980 Ok((mas, state))
981 }
982 .instrument(tracing::info_span!("ingest_task")),
983 );
984
985 let res = synapse
988 .read_refreshable_token_pairs()
989 .map_err(|e| e.into_synapse("reading refresh token pairs"))
990 .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
991 .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
992 .await;
993
994 let (mas, state) = task.await.into_join("refresh token write task")??;
995
996 res?;
997
998 info!(
999 "{} refreshable token pairs migrated ({} skipped) in {:.1}s",
1000 progress_counter_.migrated(),
1001 progress_counter_.skipped(),
1002 Instant::now().duration_since(start).as_secs_f64()
1003 );
1004
1005 Ok((mas, state))
1006}
1007
1008fn transform_user(
1009 user: &SynapseUser,
1010 server_name: &str,
1011 rng: &mut impl RngCore,
1012) -> Result<(MasNewUser, Option<MasNewUserPassword>), Error> {
1013 let username = user
1014 .name
1015 .extract_localpart(server_name)
1016 .into_extract_localpart(user.name.clone())?
1017 .to_owned();
1018
1019 let user_id = Uuid::from(Ulid::from_datetime_with_source(
1020 DateTime::<Utc>::from(user.creation_ts).into(),
1021 rng,
1022 ))
1023 .try_into()
1024 .expect("ULID generation lead to a nil UUID, this is a bug!");
1025
1026 let new_user = MasNewUser {
1027 user_id,
1028 username,
1029 created_at: user.creation_ts.into(),
1030 locked_at: user.locked.then_some(user.creation_ts.into()),
1031 deactivated_at: bool::from(user.deactivated).then_some(user.creation_ts.into()),
1032 can_request_admin: bool::from(user.admin),
1033 is_guest: bool::from(user.is_guest),
1034 };
1035
1036 let mas_password = user
1037 .password_hash
1038 .clone()
1039 .map(|password_hash| MasNewUserPassword {
1040 user_password_id: Uuid::from(Ulid::from_datetime_with_source(
1041 DateTime::<Utc>::from(user.creation_ts).into(),
1042 rng,
1043 )),
1044 user_id: new_user.user_id,
1045 hashed_password: password_hash,
1046 created_at: new_user.created_at,
1047 });
1048
1049 Ok((new_user, mas_password))
1050}