# Gen Statem to Tokio --- ## 1. Introduction to Erlang --- ### Erlang Functional programming language designed for concurrent, distributed systems > [!note] Erlang Details > > Erlang was created by Ericsson in the 1980s for building fault-tolerant telecommunications systems. --- ### BEAM Virtual machine that runs Erlang code > [!note] BEAM Details > > BEAM (Bogdan/Björn's Erlang Abstract Machine) is the virtual machine that executes Erlang bytecode. Although BEAM was created for Erlang, several other languages have been either created for it or ported to run on it. The most popular of these is Elixir. --- ### OTP Open Telecom Platform: Set of libraries and design principles > [!note] OTP Details > > OTP provides: > > - **Behaviors**: `gen_server`, `gen_statem` - reusable patterns > - **Supervision trees**: Hierarchical process supervision for fault tolerance > - **Hot code reloading**: Update code without stopping the system > - **Application framework**: Structured way to build and deploy applications --- ## Tokio Example 1: Simulate Erlang Process using Tasks and Channels [full code](https://github.com/doitian/from-statem-to-tokio/blob/main/examples/01_basic_process.rs) --- ```rust let (tx, mut rx) = mpsc::channel(32); tokio::spawn(async move { while let Some(msg) = rx.recv().await { println!("Received: {:?}", msg); } }); tx.send("Hello".to_string()).await.unwrap(); ``` > [!note] Comparison of Tokio and Erlang > > - `tokio::sync::mpsc`: mailbox > - `tokio::spawn`: starts a process --- ## Gotchas - Mailbox vs Channel - Blocking Call - Supervisor > [!note] Workarounds > - `tokio:select!` on multiple queue of different priorities, or on cancellation token > - `tokio::sync::oneshot` for response, while Erlang reuse mailbox to receive response. > - `tokio::spawn` returns a join handle, which in turn returns an error when awaited and the task has exited because of panic. --- ## 2. Overview of gen_statem --- ### Behavior > Behavior = Engine + Callback > [!note] > > - Engine implements the state machine pattern > - Callback contains state-specific logic --- ### Strategy Pattern ``` gen_statem module Callback module ----------------- --------------- gen_statem:start gen_statem:start_monitor gen_statem:start_link -----> Module:init/1 gen_statem:stop Supervisor exit Callback failure -----> Module:terminate/3 gen_statem:send_request gen_statem:call gen_statem:cast -----> Module:StateName/3 or -----> Module:handle_event/4 ``` --- ### History `gen_fsm` → `gen_statem` > [!note] > `gen_statem` replaces `gen_fsm` since Erlang/OTP 20.0 --- ### Transition ```text State(S) x Event(E) -> Actions(A), State(S') ``` --- ### Expanded ```text State(S) x Data(D) x EventType(T) x Event(E) -> Actions(A), State(S'), Data(D') ``` --- ### Transition Event Handler ```erlang Module:StateName(EventType, Event, Data) %% Module:handle_event(EventType, Event, 'StateName', Data) Module:handle_event(EventType, Event, State, Data) -> { NextState, NewData, Actions } ``` --- ### Event Type: cast Asynchronous message (fire-and-forget) ``` erlang gen_statem:cast(Ref, Msg) -> handle_event(cast, Msg, State, Data) ``` > [!note] cast Events > > `cast` events: > > - No reply expected > - Non-blocking > - Use for notifications, commands that don't need confirmation --- ### Event Type: call Synchronous request (expects reply) ``` erlang gen_statem:call(Ref, Request) -> handle_event({call, From}, Request, State, Data) ``` > [!note] call Events > > `{call, From}` events: > > - Blocks until reply > - `From` contains reply address > - Use for requests that need a response --- ## Tokio Example 2: State x Event → NewState [full code](https://github.com/doitian/from-statem-to-tokio/blob/main/examples/02_state_x_event_to_new_state.rs) --- ### State ```rust #[derive(Copy, Clone, Debug)] enum Switch { On, Off, } ``` --- ### Event ```rust enum Event { Click, // Cast Get(oneshot::Sender<Switch>), // Call } ``` --- ### Event Handler ```rust // self: State async fn handle_event(self, event: Self::Event) -> Self; ``` --- ### Engine ```rust // self: State async fn run(mut self, mut event_rx: mpsc::Receiver<Self::Event>) { while let Some(event) = event_rx.recv().await { self = self.handle_event(event).await; } } ``` --- ### Callback ```rust async fn handle_event(self, event: Self::Event) -> Self { match (self, event) { (Switch::On, Event::Click) => Switch::Off, (Switch::Off, Event::Click) => Switch::On, (_, Event::Get(sender)) => { let _ = sender.send(self); self } } } ``` --- ## 3. Entering New State --- ### When to Execute Business Logic - When events occur - When entering new states > [!note] > If we have multiple paths to enter a new states, it's simpler to run some code when entering the new state. --- ### State Change: S' ≠ S --- ### Everything is Event --- ### Event Type: enter - Opt-in feature - Triggered when `NewState != State` - Or ... --- ### New State Shortcuts - `{next_state, NextState, NewData}` - `{keep_state, NewData}` - `keep_state_and_data` - <mark style="background: #F9CACA;">`{repeat_state, NewData}`</mark> --- ## Tokio Example 3: Entering State [full code](https://github.com/doitian/from-statem-to-tokio/blob/main/examples/03_entering_state.rs) --- ### State ```rust #[derive(Copy, Clone, Debug, Eq, PartialEq)] enum Switch { On, Off, } ``` > [!note] > `Eq` is required to check whether State has changed. --- ### Event ```rust enum Event<S, T> { External(T), // Entered a new state. The payload is the old state. Enter(S), } ``` > [!note] > We need a new branch to indate the new `Enter` event. --- ### Select an Event --- ```rust struct Mailbox<T> { // Events that sent via channel rx: mpsc::Receiver<T>, // The pending `Enter` event queue: Option<T>, } ``` --- ```rust async fn recv(&mut self) -> Option<T> { if self.queue.is_some() { self.queue.take() } else { self.rx.recv().await } } ``` --- ### Engine ```rust async fn run(mut self, event_rx: Receiver) { let mut mailbox = Mailbox::new(event_rx); if Self::state_enter() { // First `enter` event mailbox.push(Event::Enter(self.clone())) } ... } ``` --- ```rust async fn run(mut self, event_rx: Receiver) { let mut mailbox = Mailbox::new(event_rx); ... let mut old_state = self.clone(); while let Some(event) = mailbox.recv().await { self = self.handle_event(event).await; if Self::state_enter() && self != old_state { mailbox.push(Event::Enter(old_state)); old_state = self.clone() } } } ``` --- ## 4. Repeat State Trigger `Enter` when S = S' --- `{repeat_state, NewData}` --- ## Tokio Example 4: Repeat State [full code](https://github.com/doitian/from-statem-to-tokio/blob/main/examples/04_repeat_state.rs) --- ### Tagged State Returned from Event Handler ```rust enum Transition<S> { NewState(S), RepeatState(S), } ``` --- ```diff while let Some(event) = mailbox.recv().await { - self = self.handle_event(event).await; + let transition = self.handle_event(event).await; + let is_repeat = transition.is_repeat(); + self = transition.into_state(); - if Self::state_enter() && self != old_state { + if Self::state_enter() && (is_repeat || self != old_state) { mailbox.push(Event::Enter(old_state)); old_state = self.clone() } } ``` --- ### Force Entering State ```rust match (self, event) { (On, External(Click)) => Transition::NewState(Off), (Off, External(Click)) => Transition::NewState(On), (_, External(TurnOn)) => Transition::RepeatState(On), (On, Enter(_)) => { println!("Switched ON"); Transition::NewState(self) } ... } ``` --- ## 5. State Data > [!note] > > Attach data that can be changed without entering new state --- ### Separate Structure ```rust enum State { On, Off, } struct Data { counter: u64 } ``` --- ### Flatten Structure ```rust enum Status { On, Off, } struct State { // This is what Erlang called State status: Status, // Remaining fields are data counter: u64, } ``` > [!note] > Common in database. Just need a trait method to compare that two states have the the same status. --- ## Tokio Example 5: State Data [full code](https://github.com/doitian/from-statem-to-tokio/blob/main/examples/05_state_data.rs) --- ### Event Handler ```rust async fn handle_event( self, data: Self::Data, event: Event<Self, Self::ExternalEvent>, ) -> (Transition<Self>, Self::Data); ``` --- ### Count How Many Times Been Turned On --- #### Data ```rust type Data = u64; ``` --- #### Callback ```rust (Switch::On, Event::Enter(_)) => { println!("Switched ON"); (Transition::NewState(self), data + 1) } ``` --- ## 6. Postpone Action --- Callback instructs Engine to perform Actions --- **Postpone Action** leaves the Event to a later State > [!note] > When events are important but we cannot process them yet. --- ## Tokio Example 6: Postpone Action [full code](https://github.com/doitian/from-statem-to-tokio/blob/main/examples/06_postpone.rs) --- ### Actions ```rust struct Actions<E> { // Postponed external event postpone: Option<E>, } ``` > [!note] Actions Builder > Add builder to create Actions --- ### Event Handler ```rust async fn handle_event( self, data: Self::Data, event: Event<Self, Self::ExternalEvent>, ) -> (Transition<Self>, Self::Data, Actions<Self::ExternalEvent>) ``` --- ### State ```rust #[derive(Copy, Clone, Debug, Eq, PartialEq)] enum Switch { On, Off, Maintenance, } ``` --- ### Postpone Events in Maintenance Mode ```rust (Switch::Maintenance, Event::External(event)) => ( Transition::NewState(Switch::Maintenance), data, ActionsBuilder::new().postpone(event).build(), ), ``` --- ### Engine Queues Postponed Events ```rust struct Mailbox<T> { rx: mpsc::Receiver<T>, queue: VecDeque<T>, postponed: Vec<T>, } ``` --- ### Re-Add Events When Entering New State ```rust fn enter_new_state(&mut self) { self.queue.extend(self.postponed.drain(..)); } ``` --- ### Engine Handles Postpone Action ```rust while let Some(event) = mailbox.recv().await { let (transition, new_data, actions) = self.handle_event( data, event).await; ... if let Some(event) = actions.postpone { mailbox.postpone(Event::External(event)); } if Self::state_enter() && (is_repeat || self != old_state) { mailbox.enter_new_state(); mailbox.push(Event::Enter(old_state)); old_state = self.clone() } } ``` --- ## 7. Timeout Action --- Timeout is Event --- - Event Timeout: Cleared on any event - State Timeout: Cleared on entering new state - Named Timeout: Manual Clearing --- ## Tokio Example 7: State Timeout Auto Turn Off [full code](https://github.com/doitian/from-statem-to-tokio/blob/main/examples/07_state_timeout.rs) --- ### Event ```rust enum Event<S, E> { External(E), Enter(S), StateTimeout, } ``` --- ### Let Mainbox Generates Timeout Event ```rust struct Mailbox<S, E> { rx: mpsc::Receiver<Event<S, E>>, deadline: Option<Instant>, queue: VecDeque<Event<S, E>>, postponed: Vec<Event<S, E>>, } ``` --- ```rust match self.deadline { Some(deadline) => { tokio::select! { event = self.rx.recv() => event, _ = sleep_until(deadline) => { self.deadline = None; Some(Event::StateTimeout) } } } None => self.rx.recv().await, } ``` --- ```rust fn enter_new_state(&mut self) { // Auto clear on entering new state self.deadline = None; self.queue.extend(self.postponed.drain(..)); } ``` --- ## 8. Inserted Events Action --- ### Use Cases - Preprocess - State Machines Composition --- #### Use Case: Preprocess Combine Key Down and Key Up into Key Press Event --- ## Tokio Example 8: Inserted Events [full code](https://github.com/doitian/from-statem-to-tokio/blob/main/examples/08_inserted_events.rs) --- ### Design Preprocess `TurnOn` and `Click` and generate internal `TurnOn`, `TurnOff` --- ### Differences - Erlang allow insert any type of events - This example only inserts internal events --- ### Event ```rust enum Event<S, E, I> { External(E), Enter(S), StateTimeout, Internal(I), } ``` --- ### Callback ```rust (Switch::On, Event::External(ExternalEvent::Click)) => ( Transition::NewState(self), data, ActionsBuilder::new() .new_event(InternalEvent::TurnOff) .build(), ), ``` --- ```rust (Switch::Off, Event::External(ExternalEvent::Click)) => ( Transition::NewState(self), data, ActionsBuilder::new() .new_event(InternalEvent::TurnOn) .build(), ), ``` --- ```rust (Switch::On | Switch::Off, Event::External(TurnOn)) => ( Transition::NewState(self), data, ActionsBuilder::new() .new_event(InternalEvent::TurnOn) .build(), ), ``` --- ### Internal Event Handler ```rust (_, Event::Internal(InternalEvent::TurnOn)) => ( Transition::RepeatState(Switch::On), data, Actions::default(), ), (_, Event::Internal(InternalEvent::TurnOff)) => { (Transition::NewState(Switch::Off), data, Actions::default()) } ``` --- ### Engine ```rust for internal_event in actions.new_events { mailbox.push(Event::Internal(internal_event)); } ``` --- ## Recap --- Behavior = Engine + Callback --- State(S) x Event(E) -> Actions(A), State(S') --- Everything is Event - Enter - Timeout --- Useful Actions - Postpone - Timeouts - Inserted Event