mas_handlers/activity_tracker/
mod.rs1mod bound;
8mod worker;
9
10use std::net::IpAddr;
11
12use chrono::{DateTime, Utc};
13use mas_data_model::{
14    BrowserSession, Clock, CompatSession, Session, personal::session::PersonalSession,
15};
16use mas_storage::BoxRepositoryFactory;
17use tokio_util::{sync::CancellationToken, task::TaskTracker};
18use ulid::Ulid;
19
20pub use self::bound::Bound;
21use self::worker::Worker;
22
23static MESSAGE_QUEUE_SIZE: usize = 1000;
24
25#[derive(Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Hash)]
26enum SessionKind {
27    OAuth2,
28    Compat,
29    Personal,
31    Browser,
32}
33
34impl SessionKind {
35    const fn as_str(self) -> &'static str {
36        match self {
37            SessionKind::OAuth2 => "oauth2",
38            SessionKind::Compat => "compat",
39            SessionKind::Personal => "personal",
40            SessionKind::Browser => "browser",
41        }
42    }
43}
44
45enum Message {
46    Record {
47        kind: SessionKind,
48        id: Ulid,
49        date_time: DateTime<Utc>,
50        ip: Option<IpAddr>,
51    },
52    Flush(tokio::sync::oneshot::Sender<()>),
53}
54
55#[derive(Clone)]
56pub struct ActivityTracker {
57    channel: tokio::sync::mpsc::Sender<Message>,
58}
59
60impl ActivityTracker {
61    #[must_use]
67    pub fn new(
68        repository_factory: BoxRepositoryFactory,
69        flush_interval: std::time::Duration,
70        task_tracker: &TaskTracker,
71        cancellation_token: CancellationToken,
72    ) -> Self {
73        let worker = Worker::new(repository_factory);
74        let (sender, receiver) = tokio::sync::mpsc::channel(MESSAGE_QUEUE_SIZE);
75        let tracker = ActivityTracker { channel: sender };
76
77        task_tracker.spawn(
79            tracker
80                .clone()
81                .flush_loop(flush_interval, cancellation_token.clone()),
82        );
83        task_tracker.spawn(worker.run(receiver, cancellation_token));
84
85        tracker
86    }
87
88    #[must_use]
90    pub fn bind(self, ip: Option<IpAddr>) -> Bound {
91        Bound::new(self, ip)
92    }
93
94    pub async fn record_oauth2_session(
96        &self,
97        clock: &dyn Clock,
98        session: &Session,
99        ip: Option<IpAddr>,
100    ) {
101        let res = self
102            .channel
103            .send(Message::Record {
104                kind: SessionKind::OAuth2,
105                id: session.id,
106                date_time: clock.now(),
107                ip,
108            })
109            .await;
110
111        if let Err(e) = res {
112            tracing::error!("Failed to record OAuth2 session: {}", e);
113        }
114    }
115
116    pub async fn record_personal_session(
118        &self,
119        clock: &dyn Clock,
120        session: &PersonalSession,
121        ip: Option<IpAddr>,
122    ) {
123        let res = self
124            .channel
125            .send(Message::Record {
126                kind: SessionKind::Personal,
127                id: session.id,
128                date_time: clock.now(),
129                ip,
130            })
131            .await;
132
133        if let Err(e) = res {
134            tracing::error!("Failed to record Personal session: {}", e);
135        }
136    }
137
138    pub async fn record_compat_session(
140        &self,
141        clock: &dyn Clock,
142        compat_session: &CompatSession,
143        ip: Option<IpAddr>,
144    ) {
145        let res = self
146            .channel
147            .send(Message::Record {
148                kind: SessionKind::Compat,
149                id: compat_session.id,
150                date_time: clock.now(),
151                ip,
152            })
153            .await;
154
155        if let Err(e) = res {
156            tracing::error!("Failed to record compat session: {}", e);
157        }
158    }
159
160    pub async fn record_browser_session(
162        &self,
163        clock: &dyn Clock,
164        browser_session: &BrowserSession,
165        ip: Option<IpAddr>,
166    ) {
167        let res = self
168            .channel
169            .send(Message::Record {
170                kind: SessionKind::Browser,
171                id: browser_session.id,
172                date_time: clock.now(),
173                ip,
174            })
175            .await;
176
177        if let Err(e) = res {
178            tracing::error!("Failed to record browser session: {}", e);
179        }
180    }
181
182    pub async fn flush(&self) {
184        let (tx, rx) = tokio::sync::oneshot::channel();
185        let res = self.channel.send(Message::Flush(tx)).await;
186
187        match res {
188            Ok(()) => {
189                if let Err(e) = rx.await {
190                    tracing::error!(
191                        error = &e as &dyn std::error::Error,
192                        "Failed to flush activity tracker"
193                    );
194                }
195            }
196            Err(e) => {
197                tracing::error!(
198                    error = &e as &dyn std::error::Error,
199                    "Failed to flush activity tracker"
200                );
201            }
202        }
203    }
204
205    async fn flush_loop(
207        self,
208        interval: std::time::Duration,
209        cancellation_token: CancellationToken,
210    ) {
211        let _guard = cancellation_token.clone().drop_guard();
214        let mut interval = tokio::time::interval(interval);
215        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
216
217        loop {
218            tokio::select! {
219                biased;
220
221                () = cancellation_token.cancelled() => {
222                    return;
224                }
225
226                () = self.channel.closed() => {
228                    return;
230                }
231
232
233                _ = interval.tick() => {
234                    self.flush().await;
235                }
236            }
237        }
238    }
239}