syn2mas/
migration.rs

1// Copyright 2024, 2025 New Vector Ltd.
2//
3// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
4// Please see LICENSE files in the repository root for full details.
5
6//! # Migration
7//!
8//! This module provides the high-level logic for performing the Synapse-to-MAS
9//! database migration.
10//!
11//! This module does not implement any of the safety checks that should be run
12//! *before* the migration.
13
14use std::time::Instant;
15
16use chrono::{DateTime, Utc};
17use compact_str::CompactString;
18use futures_util::{SinkExt, StreamExt as _, TryFutureExt, TryStreamExt as _};
19use mas_data_model::Clock;
20use rand::{RngCore, SeedableRng};
21use thiserror::Error;
22use thiserror_ext::ContextInto;
23use tokio_util::sync::PollSender;
24use tracing::{Instrument as _, Level, info};
25use ulid::Ulid;
26use uuid::{NonNilUuid, Uuid};
27
28use crate::{
29    HashMap, ProgressCounter, RandomState, SynapseReader,
30    mas_writer::{
31        self, MasNewCompatAccessToken, MasNewCompatRefreshToken, MasNewCompatSession,
32        MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
33        MasNewUserPassword, MasWriteBuffer, MasWriter,
34    },
35    progress::{EntityType, Progress},
36    synapse_reader::{
37        self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice,
38        SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser,
39    },
40};
41
42#[derive(Debug, Error, ContextInto)]
43pub enum Error {
44    #[error("error when reading synapse DB ({context}): {source}")]
45    Synapse {
46        source: synapse_reader::Error,
47        context: String,
48    },
49    #[error("error when writing to MAS DB ({context}): {source}")]
50    Mas {
51        source: mas_writer::Error,
52        context: String,
53    },
54    #[error("failed to extract localpart of {user:?}: {source}")]
55    ExtractLocalpart {
56        source: ExtractLocalpartError,
57        user: FullUserId,
58    },
59    #[error("channel closed")]
60    ChannelClosed,
61
62    #[error("task failed ({context}): {source}")]
63    Join {
64        source: tokio::task::JoinError,
65        context: String,
66    },
67
68    #[error("user {user} was not found for migration but a row in {table} was found for them")]
69    MissingUserFromDependentTable { table: String, user: FullUserId },
70    #[error(
71        "missing a mapping for the auth provider with ID {synapse_id:?} (used by {user} and maybe other users)"
72    )]
73    MissingAuthProviderMapping {
74        /// `auth_provider` ID of the provider in Synapse, for which we have no
75        /// mapping
76        synapse_id: String,
77        /// a user that is using this auth provider
78        user: FullUserId,
79    },
80}
81
82bitflags::bitflags! {
83    #[derive(Debug, Clone, Copy)]
84    struct UserFlags: u8 {
85        const IS_SYNAPSE_ADMIN = 0b0000_0001;
86        const IS_DEACTIVATED = 0b0000_0010;
87        const IS_GUEST = 0b0000_0100;
88        const IS_APPSERVICE = 0b0000_1000;
89    }
90}
91
92impl UserFlags {
93    const fn is_deactivated(self) -> bool {
94        self.contains(UserFlags::IS_DEACTIVATED)
95    }
96
97    const fn is_guest(self) -> bool {
98        self.contains(UserFlags::IS_GUEST)
99    }
100
101    const fn is_synapse_admin(self) -> bool {
102        self.contains(UserFlags::IS_SYNAPSE_ADMIN)
103    }
104
105    const fn is_appservice(self) -> bool {
106        self.contains(UserFlags::IS_APPSERVICE)
107    }
108}
109
110#[derive(Debug, Clone, Copy)]
111struct UserInfo {
112    mas_user_id: Option<NonNilUuid>,
113    flags: UserFlags,
114}
115
116struct MigrationState {
117    /// The server name we're migrating from
118    server_name: String,
119
120    /// Lookup table from user localpart to that user's infos
121    users: HashMap<CompactString, UserInfo>,
122
123    /// Mapping of MAS user ID + device ID to a MAS compat session ID.
124    devices_to_compat_sessions: HashMap<(NonNilUuid, CompactString), Uuid>,
125
126    /// A mapping of Synapse external ID providers to MAS upstream OAuth 2.0
127    /// provider ID
128    provider_id_mapping: std::collections::HashMap<String, Uuid>,
129}
130
131/// Performs a migration from Synapse's database to MAS' database.
132///
133/// # Panics
134///
135/// - If there are more than `usize::MAX` users
136///
137/// # Errors
138///
139/// Errors are returned under the following circumstances:
140///
141/// - An underlying database access error, either to MAS or to Synapse.
142/// - Invalid data in the Synapse database.
143#[expect(clippy::implicit_hasher)]
144#[allow(clippy::too_many_arguments)]
145pub async fn migrate(
146    mut synapse: SynapseReader<'_>,
147    mas: MasWriter,
148    server_name: String,
149    clock: &dyn Clock,
150    rng: &mut impl RngCore,
151    provider_id_mapping: std::collections::HashMap<String, Uuid>,
152    progress: &Progress,
153    ignore_missing_auth_providers: bool,
154) -> Result<(), Error> {
155    let counts = synapse.count_rows().await.into_synapse("counting users")?;
156
157    let state = MigrationState {
158        server_name,
159        // We oversize the hashmaps, as the estimates are innaccurate, and we would like to avoid
160        // reallocations.
161        users: HashMap::with_capacity_and_hasher(counts.users * 9 / 8, RandomState::default()),
162        devices_to_compat_sessions: HashMap::with_capacity_and_hasher(
163            counts.devices * 9 / 8,
164            RandomState::default(),
165        ),
166        provider_id_mapping,
167    };
168
169    let progress_counter = progress.migrating_data(EntityType::Users, counts.users);
170    let (mas, state) = migrate_users(&mut synapse, mas, state, rng, progress_counter).await?;
171
172    let progress_counter = progress.migrating_data(EntityType::ThreePids, counts.threepids);
173    let (mas, state) = migrate_threepids(&mut synapse, mas, rng, state, progress_counter).await?;
174
175    let progress_counter = progress.migrating_data(EntityType::ExternalIds, counts.external_ids);
176    let (mas, state) = migrate_external_ids(
177        &mut synapse,
178        mas,
179        rng,
180        state,
181        progress_counter,
182        ignore_missing_auth_providers,
183    )
184    .await?;
185
186    let progress_counter = progress.migrating_data(
187        EntityType::NonRefreshableAccessTokens,
188        counts.access_tokens - counts.refresh_tokens,
189    );
190    let (mas, state) =
191        migrate_unrefreshable_access_tokens(&mut synapse, mas, clock, rng, state, progress_counter)
192            .await?;
193
194    let progress_counter =
195        progress.migrating_data(EntityType::RefreshableTokens, counts.refresh_tokens);
196    let (mas, state) =
197        migrate_refreshable_token_pairs(&mut synapse, mas, clock, rng, state, progress_counter)
198            .await?;
199
200    let progress_counter = progress.migrating_data(EntityType::Devices, counts.devices);
201    let (mas, _state) = migrate_devices(&mut synapse, mas, rng, state, progress_counter).await?;
202
203    synapse
204        .finish()
205        .await
206        .into_synapse("failed to close Synapse reader")?;
207
208    mas.finish(progress)
209        .await
210        .into_mas("failed to finalise MAS database")?;
211
212    Ok(())
213}
214
215#[tracing::instrument(skip_all, level = Level::INFO)]
216async fn migrate_users(
217    synapse: &mut SynapseReader<'_>,
218    mut mas: MasWriter,
219    mut state: MigrationState,
220    rng: &mut impl RngCore,
221    progress_counter: ProgressCounter,
222) -> Result<(MasWriter, MigrationState), Error> {
223    let start = Instant::now();
224    let progress_counter_ = progress_counter.clone();
225
226    let (tx, mut rx) = tokio::sync::mpsc::channel::<SynapseUser>(100 * 1024);
227
228    // create a new RNG seeded from the passed RNG so that we can move it into the
229    // spawned task
230    let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
231    let task = tokio::spawn(
232        async move {
233            let mut user_buffer = MasWriteBuffer::new(&mas);
234            let mut password_buffer = MasWriteBuffer::new(&mas);
235
236            while let Some(user) = rx.recv().await {
237                // Handling an edge case: some AS users may have invalid localparts containing
238                // extra `:` characters. These users are ignored and a warning is logged.
239                if user.appservice_id.is_some()
240                    && user
241                        .name
242                        .0
243                        .strip_suffix(&format!(":{}", state.server_name))
244                        .is_some_and(|localpart| localpart.contains(':'))
245                {
246                    tracing::warn!("AS user {} has invalid localpart, ignoring!", user.name.0);
247                    continue;
248                }
249
250                let (mas_user, mas_password_opt) =
251                    transform_user(&user, &state.server_name, &mut rng)?;
252
253                let mut flags = UserFlags::empty();
254                if bool::from(user.admin) {
255                    flags |= UserFlags::IS_SYNAPSE_ADMIN;
256                }
257                if bool::from(user.deactivated) {
258                    flags |= UserFlags::IS_DEACTIVATED;
259                }
260                if bool::from(user.is_guest) {
261                    flags |= UserFlags::IS_GUEST;
262                }
263                if user.appservice_id.is_some() {
264                    flags |= UserFlags::IS_APPSERVICE;
265
266                    progress_counter.increment_skipped();
267
268                    // Special case for appservice users: we don't insert them into the database
269                    // We just record the user's information in the state and continue
270                    state.users.insert(
271                        CompactString::new(&mas_user.username),
272                        UserInfo {
273                            mas_user_id: None,
274                            flags,
275                        },
276                    );
277                    continue;
278                }
279
280                state.users.insert(
281                    CompactString::new(&mas_user.username),
282                    UserInfo {
283                        mas_user_id: Some(mas_user.user_id),
284                        flags,
285                    },
286                );
287
288                user_buffer
289                    .write(&mut mas, mas_user)
290                    .await
291                    .into_mas("writing user")?;
292
293                if let Some(mas_password) = mas_password_opt {
294                    password_buffer
295                        .write(&mut mas, mas_password)
296                        .await
297                        .into_mas("writing password")?;
298                }
299
300                progress_counter.increment_migrated();
301            }
302
303            user_buffer
304                .finish(&mut mas)
305                .await
306                .into_mas("writing users")?;
307            password_buffer
308                .finish(&mut mas)
309                .await
310                .into_mas("writing passwords")?;
311
312            Ok((mas, state))
313        }
314        .instrument(tracing::info_span!("ingest_task")),
315    );
316
317    // In case this has an error, we still want to join the task, so we look at the
318    // error later
319    let res = synapse
320        .read_users()
321        .map_err(|e| e.into_synapse("reading users"))
322        .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
323        .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
324        .await;
325
326    let (mas, state) = task.await.into_join("user write task")??;
327
328    res?;
329
330    info!(
331        "{} users migrated ({} skipped) in {:.1}s",
332        progress_counter_.migrated(),
333        progress_counter_.skipped(),
334        Instant::now().duration_since(start).as_secs_f64()
335    );
336
337    Ok((mas, state))
338}
339
340#[tracing::instrument(skip_all, level = Level::INFO)]
341async fn migrate_threepids(
342    synapse: &mut SynapseReader<'_>,
343    mut mas: MasWriter,
344    rng: &mut impl RngCore,
345    state: MigrationState,
346    progress_counter: ProgressCounter,
347) -> Result<(MasWriter, MigrationState), Error> {
348    let start = Instant::now();
349    let progress_counter_ = progress_counter.clone();
350
351    let (tx, mut rx) = tokio::sync::mpsc::channel::<SynapseThreepid>(100 * 1024);
352
353    // create a new RNG seeded from the passed RNG so that we can move it into the
354    // spawned task
355    let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
356    let task = tokio::spawn(
357        async move {
358            let mut email_buffer = MasWriteBuffer::new(&mas);
359            let mut unsupported_buffer = MasWriteBuffer::new(&mas);
360
361            while let Some(threepid) = rx.recv().await {
362                let SynapseThreepid {
363                    user_id: synapse_user_id,
364                    medium,
365                    address,
366                    added_at,
367                } = threepid;
368                let created_at: DateTime<Utc> = added_at.into();
369
370                let username = synapse_user_id
371                    .extract_localpart(&state.server_name)
372                    .into_extract_localpart(synapse_user_id.clone())?
373                    .to_owned();
374                let Some(user_infos) = state.users.get(username.as_str()).copied() else {
375                    return Err(Error::MissingUserFromDependentTable {
376                        table: "user_threepids".to_owned(),
377                        user: synapse_user_id,
378                    });
379                };
380
381                let Some(mas_user_id) = user_infos.mas_user_id else {
382                    progress_counter.increment_skipped();
383                    continue;
384                };
385
386                if medium == "email" {
387                    email_buffer
388                        .write(
389                            &mut mas,
390                            MasNewEmailThreepid {
391                                user_id: mas_user_id,
392                                user_email_id: Uuid::from(Ulid::from_datetime_with_source(
393                                    created_at.into(),
394                                    &mut rng,
395                                )),
396                                email: address,
397                                created_at,
398                            },
399                        )
400                        .await
401                        .into_mas("writing email")?;
402                } else {
403                    unsupported_buffer
404                        .write(
405                            &mut mas,
406                            MasNewUnsupportedThreepid {
407                                user_id: mas_user_id,
408                                medium,
409                                address,
410                                created_at,
411                            },
412                        )
413                        .await
414                        .into_mas("writing unsupported threepid")?;
415                }
416
417                progress_counter.increment_migrated();
418            }
419
420            email_buffer
421                .finish(&mut mas)
422                .await
423                .into_mas("writing email threepids")?;
424            unsupported_buffer
425                .finish(&mut mas)
426                .await
427                .into_mas("writing unsupported threepids")?;
428
429            Ok((mas, state))
430        }
431        .instrument(tracing::info_span!("ingest_task")),
432    );
433
434    // In case this has an error, we still want to join the task, so we look at the
435    // error later
436    let res = synapse
437        .read_threepids()
438        .map_err(|e| e.into_synapse("reading threepids"))
439        .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
440        .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
441        .await;
442
443    let (mas, state) = task.await.into_join("threepid write task")??;
444
445    res?;
446
447    info!(
448        "{} third-party IDs migrated ({} skipped) in {:.1}s",
449        progress_counter_.migrated(),
450        progress_counter_.skipped(),
451        Instant::now().duration_since(start).as_secs_f64()
452    );
453
454    Ok((mas, state))
455}
456
457#[tracing::instrument(skip_all, level = Level::INFO)]
458async fn migrate_external_ids(
459    synapse: &mut SynapseReader<'_>,
460    mut mas: MasWriter,
461    rng: &mut impl RngCore,
462    state: MigrationState,
463    progress_counter: ProgressCounter,
464    ignore_missing_auth_providers: bool,
465) -> Result<(MasWriter, MigrationState), Error> {
466    let start = Instant::now();
467    let progress_counter_ = progress_counter.clone();
468
469    let (tx, mut rx) = tokio::sync::mpsc::channel::<SynapseExternalId>(100 * 1024);
470
471    // create a new RNG seeded from the passed RNG so that we can move it into the
472    // spawned task
473    let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
474    let task = tokio::spawn(
475        async move {
476            let mut write_buffer = MasWriteBuffer::new(&mas);
477
478            while let Some(extid) = rx.recv().await {
479                let SynapseExternalId {
480                    user_id: synapse_user_id,
481                    auth_provider,
482                    external_id: subject,
483                } = extid;
484                let username = synapse_user_id
485                    .extract_localpart(&state.server_name)
486                    .into_extract_localpart(synapse_user_id.clone())?
487                    .to_owned();
488                let Some(user_infos) = state.users.get(username.as_str()).copied() else {
489                    return Err(Error::MissingUserFromDependentTable {
490                        table: "user_external_ids".to_owned(),
491                        user: synapse_user_id,
492                    });
493                };
494
495                let Some(mas_user_id) = user_infos.mas_user_id else {
496                    progress_counter.increment_skipped();
497                    continue;
498                };
499
500                let Some(&upstream_provider_id) = state.provider_id_mapping.get(&auth_provider)
501                else {
502                    if ignore_missing_auth_providers {
503                        progress_counter.increment_skipped();
504                        continue;
505                    }
506                    return Err(Error::MissingAuthProviderMapping {
507                        synapse_id: auth_provider,
508                        user: synapse_user_id,
509                    });
510                };
511
512                // To save having to store user creation times, extract it from the ULID
513                // This gives millisecond precision — good enough.
514                let user_created_ts = Ulid::from(mas_user_id.get()).datetime();
515
516                let link_id: Uuid =
517                    Ulid::from_datetime_with_source(user_created_ts, &mut rng).into();
518
519                write_buffer
520                    .write(
521                        &mut mas,
522                        MasNewUpstreamOauthLink {
523                            link_id,
524                            user_id: mas_user_id,
525                            upstream_provider_id,
526                            subject,
527                            created_at: user_created_ts.into(),
528                        },
529                    )
530                    .await
531                    .into_mas("failed to write upstream link")?;
532
533                progress_counter.increment_migrated();
534            }
535
536            write_buffer
537                .finish(&mut mas)
538                .await
539                .into_mas("writing upstream links")?;
540
541            Ok((mas, state))
542        }
543        .instrument(tracing::info_span!("ingest_task")),
544    );
545
546    // In case this has an error, we still want to join the task, so we look at the
547    // error later
548    let res = synapse
549        .read_user_external_ids()
550        .map_err(|e| e.into_synapse("reading external ID"))
551        .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
552        .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
553        .await;
554
555    let (mas, state) = task.await.into_join("external IDs write task")??;
556
557    res?;
558
559    info!(
560        "{} upstream links (external IDs) migrated ({} skipped) in {:.1}s",
561        progress_counter_.migrated(),
562        progress_counter_.skipped(),
563        Instant::now().duration_since(start).as_secs_f64()
564    );
565
566    Ok((mas, state))
567}
568
569/// Migrate devices from Synapse to MAS (as compat sessions).
570///
571/// In order to get the right session creation timestamps, the access tokens
572/// must counterintuitively be migrated first, with the ULIDs passed in as
573/// `devices`.
574///
575/// This is because only access tokens store a timestamp that in any way
576/// resembles a creation timestamp.
577#[tracing::instrument(skip_all, level = Level::INFO)]
578async fn migrate_devices(
579    synapse: &mut SynapseReader<'_>,
580    mut mas: MasWriter,
581    rng: &mut impl RngCore,
582    mut state: MigrationState,
583    progress_counter: ProgressCounter,
584) -> Result<(MasWriter, MigrationState), Error> {
585    let start = Instant::now();
586    let progress_counter_ = progress_counter.clone();
587
588    let (tx, mut rx) = tokio::sync::mpsc::channel(100 * 1024);
589
590    // create a new RNG seeded from the passed RNG so that we can move it into the
591    // spawned task
592    let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
593    let task = tokio::spawn(
594        async move {
595            let mut write_buffer = MasWriteBuffer::new(&mas);
596
597            while let Some(device) = rx.recv().await {
598                let SynapseDevice {
599                    user_id: synapse_user_id,
600                    device_id,
601                    display_name,
602                    last_seen,
603                    ip,
604                    user_agent,
605                } = device;
606                let username = synapse_user_id
607                    .extract_localpart(&state.server_name)
608                    .into_extract_localpart(synapse_user_id.clone())?
609                    .to_owned();
610                let Some(user_infos) = state.users.get(username.as_str()).copied() else {
611                    return Err(Error::MissingUserFromDependentTable {
612                        table: "devices".to_owned(),
613                        user: synapse_user_id,
614                    });
615                };
616
617                let Some(mas_user_id) = user_infos.mas_user_id else {
618                    progress_counter.increment_skipped();
619                    continue;
620                };
621
622                if user_infos.flags.is_deactivated()
623                    || user_infos.flags.is_guest()
624                    || user_infos.flags.is_appservice()
625                {
626                    continue;
627                }
628
629                let session_id = *state
630                    .devices_to_compat_sessions
631                    .entry((mas_user_id, CompactString::new(&device_id)))
632                    .or_insert_with(||
633                // We don't have a creation time for this device (as it has no access token),
634                // so use now as a least-evil fallback.
635                Ulid::with_source(&mut rng).into());
636                let created_at = Ulid::from(session_id).datetime().into();
637
638                // As we're using a real IP type in the MAS database, it is possible
639                // that we encounter invalid IP addresses in the Synapse database.
640                // In that case, we should ignore them, but still log a warning.
641                // One special case: Synapse will record '-' as IP in some cases, we don't want
642                // to log about those
643                let last_active_ip = ip.filter(|ip| ip != "-").and_then(|ip| {
644                    ip.parse()
645                        .map_err(|e| {
646                            tracing::warn!(
647                                error = &e as &dyn std::error::Error,
648                                mxid = %synapse_user_id,
649                                %device_id,
650                                %ip,
651                                "Failed to parse device IP, ignoring"
652                            );
653                        })
654                        .ok()
655                });
656
657                write_buffer
658                    .write(
659                        &mut mas,
660                        MasNewCompatSession {
661                            session_id,
662                            user_id: mas_user_id,
663                            device_id: Some(device_id),
664                            human_name: display_name,
665                            created_at,
666                            is_synapse_admin: user_infos.flags.is_synapse_admin(),
667                            last_active_at: last_seen.map(DateTime::from),
668                            last_active_ip,
669                            user_agent,
670                        },
671                    )
672                    .await
673                    .into_mas("writing compat sessions")?;
674
675                progress_counter.increment_migrated();
676            }
677
678            write_buffer
679                .finish(&mut mas)
680                .await
681                .into_mas("writing compat sessions")?;
682
683            Ok((mas, state))
684        }
685        .instrument(tracing::info_span!("ingest_task")),
686    );
687
688    // In case this has an error, we still want to join the task, so we look at the
689    // error later
690    let res = synapse
691        .read_devices()
692        .map_err(|e| e.into_synapse("reading devices"))
693        .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
694        .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
695        .await;
696
697    let (mas, state) = task.await.into_join("device write task")??;
698
699    res?;
700
701    info!(
702        "{} devices migrated ({} skipped) in {:.1}s",
703        progress_counter_.migrated(),
704        progress_counter_.skipped(),
705        Instant::now().duration_since(start).as_secs_f64()
706    );
707
708    Ok((mas, state))
709}
710
711/// Migrates unrefreshable access tokens (those without an associated refresh
712/// token). Some of these may be deviceless.
713#[tracing::instrument(skip_all, level = Level::INFO)]
714async fn migrate_unrefreshable_access_tokens(
715    synapse: &mut SynapseReader<'_>,
716    mut mas: MasWriter,
717    clock: &dyn Clock,
718    rng: &mut impl RngCore,
719    mut state: MigrationState,
720    progress_counter: ProgressCounter,
721) -> Result<(MasWriter, MigrationState), Error> {
722    let start = Instant::now();
723    let progress_counter_ = progress_counter.clone();
724
725    let (tx, mut rx) = tokio::sync::mpsc::channel(100 * 1024);
726
727    let now = clock.now();
728    // create a new RNG seeded from the passed RNG so that we can move it into the
729    // spawned task
730    let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
731    let task = tokio::spawn(
732        async move {
733            let mut write_buffer = MasWriteBuffer::new(&mas);
734            let mut deviceless_session_write_buffer = MasWriteBuffer::new(&mas);
735
736            while let Some(token) = rx.recv().await {
737                let SynapseAccessToken {
738                    user_id: synapse_user_id,
739                    device_id,
740                    token,
741                    valid_until_ms,
742                    last_validated,
743                } = token;
744                let username = synapse_user_id
745                    .extract_localpart(&state.server_name)
746                    .into_extract_localpart(synapse_user_id.clone())?
747                    .to_owned();
748                let Some(user_infos) = state.users.get(username.as_str()).copied() else {
749                    return Err(Error::MissingUserFromDependentTable {
750                        table: "access_tokens".to_owned(),
751                        user: synapse_user_id,
752                    });
753                };
754
755                let Some(mas_user_id) = user_infos.mas_user_id else {
756                    progress_counter.increment_skipped();
757                    continue;
758                };
759
760                if user_infos.flags.is_deactivated()
761                    || user_infos.flags.is_guest()
762                    || user_infos.flags.is_appservice()
763                {
764                    progress_counter.increment_skipped();
765                    continue;
766                }
767
768                // It's not always accurate, but last_validated is *often* the creation time of
769                // the device If we don't have one, then use the current time as a
770                // fallback.
771                let created_at = last_validated.map_or_else(|| now, DateTime::from);
772
773                let session_id = if let Some(device_id) = device_id {
774                    // Use the existing device_id if this is the second token for a device
775                    *state
776                        .devices_to_compat_sessions
777                        .entry((mas_user_id, CompactString::new(&device_id)))
778                        .or_insert_with(|| {
779                            Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng))
780                        })
781                } else {
782                    // If this is a deviceless access token, create a deviceless compat session
783                    // for it (since otherwise we won't create one whilst migrating devices)
784                    let deviceless_session_id =
785                        Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng));
786
787                    deviceless_session_write_buffer
788                        .write(
789                            &mut mas,
790                            MasNewCompatSession {
791                                session_id: deviceless_session_id,
792                                user_id: mas_user_id,
793                                device_id: None,
794                                human_name: None,
795                                created_at,
796                                is_synapse_admin: false,
797                                last_active_at: None,
798                                last_active_ip: None,
799                                user_agent: None,
800                            },
801                        )
802                        .await
803                        .into_mas("failed to write deviceless compat sessions")?;
804
805                    deviceless_session_id
806                };
807
808                let token_id =
809                    Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng));
810
811                write_buffer
812                    .write(
813                        &mut mas,
814                        MasNewCompatAccessToken {
815                            token_id,
816                            session_id,
817                            access_token: token,
818                            created_at,
819                            expires_at: valid_until_ms.map(DateTime::from),
820                        },
821                    )
822                    .await
823                    .into_mas("writing compat access tokens")?;
824
825                progress_counter.increment_migrated();
826            }
827            write_buffer
828                .finish(&mut mas)
829                .await
830                .into_mas("writing compat access tokens")?;
831            deviceless_session_write_buffer
832                .finish(&mut mas)
833                .await
834                .into_mas("writing deviceless compat sessions")?;
835
836            Ok((mas, state))
837        }
838        .instrument(tracing::info_span!("ingest_task")),
839    );
840
841    // In case this has an error, we still want to join the task, so we look at the
842    // error later
843    let res = synapse
844        .read_unrefreshable_access_tokens()
845        .map_err(|e| e.into_synapse("reading tokens"))
846        .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
847        .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
848        .await;
849
850    let (mas, state) = task.await.into_join("token write task")??;
851
852    res?;
853
854    info!(
855        "{} non-refreshable access tokens migrated ({} skipped) in {:.1}s",
856        progress_counter_.migrated(),
857        progress_counter_.skipped(),
858        Instant::now().duration_since(start).as_secs_f64()
859    );
860
861    Ok((mas, state))
862}
863
864/// Migrates (access token, refresh token) pairs.
865/// Does not migrate non-refreshable access tokens.
866#[tracing::instrument(skip_all, level = Level::INFO)]
867async fn migrate_refreshable_token_pairs(
868    synapse: &mut SynapseReader<'_>,
869    mut mas: MasWriter,
870    clock: &dyn Clock,
871    rng: &mut impl RngCore,
872    mut state: MigrationState,
873    progress_counter: ProgressCounter,
874) -> Result<(MasWriter, MigrationState), Error> {
875    let start = Instant::now();
876    let progress_counter_ = progress_counter.clone();
877
878    let (tx, mut rx) = tokio::sync::mpsc::channel::<SynapseRefreshableTokenPair>(100 * 1024);
879
880    // create a new RNG seeded from the passed RNG so that we can move it into the
881    // spawned task
882    let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
883    let now = clock.now();
884    let task = tokio::spawn(
885        async move {
886            let mut access_token_write_buffer = MasWriteBuffer::new(&mas);
887            let mut refresh_token_write_buffer = MasWriteBuffer::new(&mas);
888
889            while let Some(token) = rx.recv().await {
890                let SynapseRefreshableTokenPair {
891                    user_id: synapse_user_id,
892                    device_id,
893                    access_token,
894                    refresh_token,
895                    valid_until_ms,
896                    last_validated,
897                } = token;
898
899                let username = synapse_user_id
900                    .extract_localpart(&state.server_name)
901                    .into_extract_localpart(synapse_user_id.clone())?
902                    .to_owned();
903                let Some(user_infos) = state.users.get(username.as_str()).copied() else {
904                    return Err(Error::MissingUserFromDependentTable {
905                        table: "refresh_tokens".to_owned(),
906                        user: synapse_user_id,
907                    });
908                };
909
910                let Some(mas_user_id) = user_infos.mas_user_id else {
911                    progress_counter.increment_skipped();
912                    continue;
913                };
914
915                if user_infos.flags.is_deactivated()
916                    || user_infos.flags.is_guest()
917                    || user_infos.flags.is_appservice()
918                {
919                    progress_counter.increment_skipped();
920                    continue;
921                }
922
923                // It's not always accurate, but last_validated is *often* the creation time of
924                // the device If we don't have one, then use the current time as a
925                // fallback.
926                let created_at = last_validated.map_or_else(|| now, DateTime::from);
927
928                // Use the existing device_id if this is the second token for a device
929                let session_id = *state
930                    .devices_to_compat_sessions
931                    .entry((mas_user_id, CompactString::new(&device_id)))
932                    .or_insert_with(|| {
933                        Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng))
934                    });
935
936                let access_token_id =
937                    Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng));
938                let refresh_token_id =
939                    Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng));
940
941                access_token_write_buffer
942                    .write(
943                        &mut mas,
944                        MasNewCompatAccessToken {
945                            token_id: access_token_id,
946                            session_id,
947                            access_token,
948                            created_at,
949                            expires_at: valid_until_ms.map(DateTime::from),
950                        },
951                    )
952                    .await
953                    .into_mas("writing compat access tokens")?;
954                refresh_token_write_buffer
955                    .write(
956                        &mut mas,
957                        MasNewCompatRefreshToken {
958                            refresh_token_id,
959                            session_id,
960                            access_token_id,
961                            refresh_token,
962                            created_at,
963                        },
964                    )
965                    .await
966                    .into_mas("writing compat refresh tokens")?;
967
968                progress_counter.increment_migrated();
969            }
970
971            access_token_write_buffer
972                .finish(&mut mas)
973                .await
974                .into_mas("writing compat access tokens")?;
975
976            refresh_token_write_buffer
977                .finish(&mut mas)
978                .await
979                .into_mas("writing compat refresh tokens")?;
980            Ok((mas, state))
981        }
982        .instrument(tracing::info_span!("ingest_task")),
983    );
984
985    // In case this has an error, we still want to join the task, so we look at the
986    // error later
987    let res = synapse
988        .read_refreshable_token_pairs()
989        .map_err(|e| e.into_synapse("reading refresh token pairs"))
990        .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
991        .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
992        .await;
993
994    let (mas, state) = task.await.into_join("refresh token write task")??;
995
996    res?;
997
998    info!(
999        "{} refreshable token pairs migrated ({} skipped) in {:.1}s",
1000        progress_counter_.migrated(),
1001        progress_counter_.skipped(),
1002        Instant::now().duration_since(start).as_secs_f64()
1003    );
1004
1005    Ok((mas, state))
1006}
1007
1008fn transform_user(
1009    user: &SynapseUser,
1010    server_name: &str,
1011    rng: &mut impl RngCore,
1012) -> Result<(MasNewUser, Option<MasNewUserPassword>), Error> {
1013    let username = user
1014        .name
1015        .extract_localpart(server_name)
1016        .into_extract_localpart(user.name.clone())?
1017        .to_owned();
1018
1019    let user_id = Uuid::from(Ulid::from_datetime_with_source(
1020        DateTime::<Utc>::from(user.creation_ts).into(),
1021        rng,
1022    ))
1023    .try_into()
1024    .expect("ULID generation lead to a nil UUID, this is a bug!");
1025
1026    let new_user = MasNewUser {
1027        user_id,
1028        username,
1029        created_at: user.creation_ts.into(),
1030        locked_at: user.locked.then_some(user.creation_ts.into()),
1031        deactivated_at: bool::from(user.deactivated).then_some(user.creation_ts.into()),
1032        can_request_admin: bool::from(user.admin),
1033        is_guest: bool::from(user.is_guest),
1034    };
1035
1036    let mas_password = user
1037        .password_hash
1038        .clone()
1039        .map(|password_hash| MasNewUserPassword {
1040            user_password_id: Uuid::from(Ulid::from_datetime_with_source(
1041                DateTime::<Utc>::from(user.creation_ts).into(),
1042                rng,
1043            )),
1044            user_id: new_user.user_id,
1045            hashed_password: password_hash,
1046            created_at: new_user.created_at,
1047        });
1048
1049    Ok((new_user, mas_password))
1050}