# Gen Statem to Tokio
---
## 1. Introduction to Erlang
---
### Erlang
Functional programming language designed for concurrent, distributed systems
> [!note] Erlang Details
>
> Erlang was created by Ericsson in the 1980s for building fault-tolerant telecommunications systems.
---
### BEAM
Virtual machine that runs Erlang code
> [!note] BEAM Details
>
> BEAM (Bogdan/Björn's Erlang Abstract Machine) is the virtual machine that executes Erlang bytecode. Although BEAM was created for Erlang, several other languages have been either created for it or ported to run on it. The most popular of these is Elixir.
---
### OTP
Open Telecom Platform: Set of libraries and design principles
> [!note] OTP Details
>
> OTP provides:
>
> - **Behaviors**: `gen_server`, `gen_statem` - reusable patterns
> - **Supervision trees**: Hierarchical process supervision for fault tolerance
> - **Hot code reloading**: Update code without stopping the system
> - **Application framework**: Structured way to build and deploy applications
---
## Tokio Example 1: Simulate Erlang Process using Tasks and Channels
[full code](https://github.com/doitian/from-statem-to-tokio/blob/main/examples/01_basic_process.rs)
---
```rust
let (tx, mut rx) = mpsc::channel(32);
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
println!("Received: {:?}", msg);
}
});
tx.send("Hello".to_string()).await.unwrap();
```
> [!note] Comparison of Tokio and Erlang
>
> - `tokio::sync::mpsc`: mailbox
> - `tokio::spawn`: starts a process
---
## Gotchas
- Mailbox vs Channel
- Blocking Call
- Supervisor
> [!note] Workarounds
> - `tokio:select!` on multiple queue of different priorities, or on cancellation token
> - `tokio::sync::oneshot` for response, while Erlang reuse mailbox to receive response.
> - `tokio::spawn` returns a join handle, which in turn returns an error when awaited and the task has exited because of panic.
---
## 2. Overview of gen_statem
---
### Behavior
> Behavior = Engine + Callback
> [!note]
>
> - Engine implements the state machine pattern
> - Callback contains state-specific logic
---
### Strategy Pattern
```
gen_statem module Callback module
----------------- ---------------
gen_statem:start
gen_statem:start_monitor
gen_statem:start_link -----> Module:init/1
gen_statem:stop
Supervisor exit
Callback failure -----> Module:terminate/3
gen_statem:send_request
gen_statem:call
gen_statem:cast -----> Module:StateName/3
or -----> Module:handle_event/4
```
---
### History
`gen_fsm` → `gen_statem`
> [!note]
> `gen_statem` replaces `gen_fsm` since Erlang/OTP 20.0
---
### Transition
```text
State(S) x Event(E) -> Actions(A), State(S')
```
---
### Expanded
```text
State(S) x Data(D) x EventType(T) x Event(E)
-> Actions(A), State(S'), Data(D')
```
---
### Transition Event Handler
```erlang
Module:StateName(EventType, Event, Data)
%% Module:handle_event(EventType, Event, 'StateName', Data)
Module:handle_event(EventType, Event, State, Data)
-> { NextState, NewData, Actions }
```
---
### Event Type: cast
Asynchronous message (fire-and-forget)
``` erlang
gen_statem:cast(Ref, Msg)
-> handle_event(cast, Msg, State, Data)
```
> [!note] cast Events
>
> `cast` events:
>
> - No reply expected
> - Non-blocking
> - Use for notifications, commands that don't need confirmation
---
### Event Type: call
Synchronous request (expects reply)
``` erlang
gen_statem:call(Ref, Request)
-> handle_event({call, From}, Request, State, Data)
```
> [!note] call Events
>
> `{call, From}` events:
>
> - Blocks until reply
> - `From` contains reply address
> - Use for requests that need a response
---
## Tokio Example 2: State x Event → NewState
[full code](https://github.com/doitian/from-statem-to-tokio/blob/main/examples/02_state_x_event_to_new_state.rs)
---
### State
```rust
#[derive(Copy, Clone, Debug)]
enum Switch {
On,
Off,
}
```
---
### Event
```rust
enum Event {
Click, // Cast
Get(oneshot::Sender<Switch>), // Call
}
```
---
### Event Handler
```rust
// self: State
async fn handle_event(self, event: Self::Event) -> Self;
```
---
### Engine
```rust
// self: State
async fn run(mut self, mut event_rx: mpsc::Receiver<Self::Event>) {
while let Some(event) = event_rx.recv().await {
self = self.handle_event(event).await;
}
}
```
---
### Callback
```rust
async fn handle_event(self, event: Self::Event) -> Self {
match (self, event) {
(Switch::On, Event::Click) => Switch::Off,
(Switch::Off, Event::Click) => Switch::On,
(_, Event::Get(sender)) => {
let _ = sender.send(self);
self
}
}
}
```
---
## 3. Entering New State
---
### When to Execute Business Logic
- When events occur
- When entering new states
> [!note]
> If we have multiple paths to enter a new states, it's simpler to run some code when entering the new state.
---
### State Change: S' ≠ S
---
### Everything is Event
---
### Event Type: enter
- Opt-in feature
- Triggered when `NewState != State`
- Or ...
---
### New State Shortcuts
- `{next_state, NextState, NewData}`
- `{keep_state, NewData}`
- `keep_state_and_data`
- <mark style="background: #F9CACA;">`{repeat_state, NewData}`</mark>
---
## Tokio Example 3: Entering State
[full code](https://github.com/doitian/from-statem-to-tokio/blob/main/examples/03_entering_state.rs)
---
### State
```rust
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
enum Switch {
On,
Off,
}
```
> [!note]
> `Eq` is required to check whether State has changed.
---
### Event
```rust
enum Event<S, T> {
External(T),
// Entered a new state. The payload is the old state.
Enter(S),
}
```
> [!note]
> We need a new branch to indate the new `Enter` event.
---
### Select an Event
---
```rust
struct Mailbox<T> {
// Events that sent via channel
rx: mpsc::Receiver<T>,
// The pending `Enter` event
queue: Option<T>,
}
```
---
```rust
async fn recv(&mut self) -> Option<T> {
if self.queue.is_some() {
self.queue.take()
} else {
self.rx.recv().await
}
}
```
---
### Engine
```rust
async fn run(mut self, event_rx: Receiver) {
let mut mailbox = Mailbox::new(event_rx);
if Self::state_enter() {
// First `enter` event
mailbox.push(Event::Enter(self.clone()))
}
...
}
```
---
```rust
async fn run(mut self, event_rx: Receiver) {
let mut mailbox = Mailbox::new(event_rx);
...
let mut old_state = self.clone();
while let Some(event) = mailbox.recv().await {
self = self.handle_event(event).await;
if Self::state_enter() && self != old_state {
mailbox.push(Event::Enter(old_state));
old_state = self.clone()
}
}
}
```
---
## 4. Repeat State
Trigger `Enter` when S = S'
---
`{repeat_state, NewData}`
---
## Tokio Example 4: Repeat State
[full code](https://github.com/doitian/from-statem-to-tokio/blob/main/examples/04_repeat_state.rs)
---
### Tagged State Returned from Event Handler
```rust
enum Transition<S> {
NewState(S),
RepeatState(S),
}
```
---
```diff
while let Some(event) = mailbox.recv().await {
- self = self.handle_event(event).await;
+ let transition = self.handle_event(event).await;
+ let is_repeat = transition.is_repeat();
+ self = transition.into_state();
- if Self::state_enter() && self != old_state {
+ if Self::state_enter() && (is_repeat || self != old_state) {
mailbox.push(Event::Enter(old_state));
old_state = self.clone()
}
}
```
---
### Force Entering State
```rust
match (self, event) {
(On, External(Click)) => Transition::NewState(Off),
(Off, External(Click)) => Transition::NewState(On),
(_, External(TurnOn)) => Transition::RepeatState(On),
(On, Enter(_)) => {
println!("Switched ON");
Transition::NewState(self)
}
...
}
```
---
## 5. State Data
> [!note]
>
> Attach data that can be changed without entering new state
---
### Separate Structure
```rust
enum State {
On,
Off,
}
struct Data {
counter: u64
}
```
---
### Flatten Structure
```rust
enum Status {
On,
Off,
}
struct State {
// This is what Erlang called State
status: Status,
// Remaining fields are data
counter: u64,
}
```
> [!note]
> Common in database. Just need a trait method to compare that two states have the the same status.
---
## Tokio Example 5: State Data
[full code](https://github.com/doitian/from-statem-to-tokio/blob/main/examples/05_state_data.rs)
---
### Event Handler
```rust
async fn handle_event(
self,
data: Self::Data,
event: Event<Self, Self::ExternalEvent>,
) -> (Transition<Self>, Self::Data);
```
---
### Count How Many Times Been Turned On
---
#### Data
```rust
type Data = u64;
```
---
#### Callback
```rust
(Switch::On, Event::Enter(_)) => {
println!("Switched ON");
(Transition::NewState(self), data + 1)
}
```
---
## 6. Postpone Action
---
Callback instructs Engine to perform Actions
---
**Postpone Action** leaves the Event to a later State
> [!note]
> When events are important but we cannot process them yet.
---
## Tokio Example 6: Postpone Action
[full code](https://github.com/doitian/from-statem-to-tokio/blob/main/examples/06_postpone.rs)
---
### Actions
```rust
struct Actions<E> {
// Postponed external event
postpone: Option<E>,
}
```
> [!note] Actions Builder
> Add builder to create Actions
---
### Event Handler
```rust
async fn handle_event(
self,
data: Self::Data,
event: Event<Self, Self::ExternalEvent>,
) -> (Transition<Self>, Self::Data, Actions<Self::ExternalEvent>)
```
---
### State
```rust
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
enum Switch {
On,
Off,
Maintenance,
}
```
---
### Postpone Events in Maintenance Mode
```rust
(Switch::Maintenance, Event::External(event)) => (
Transition::NewState(Switch::Maintenance),
data,
ActionsBuilder::new().postpone(event).build(),
),
```
---
### Engine Queues Postponed Events
```rust
struct Mailbox<T> {
rx: mpsc::Receiver<T>,
queue: VecDeque<T>,
postponed: Vec<T>,
}
```
---
### Re-Add Events When Entering New State
```rust
fn enter_new_state(&mut self) {
self.queue.extend(self.postponed.drain(..));
}
```
---
### Engine Handles Postpone Action
```rust
while let Some(event) = mailbox.recv().await {
let (transition, new_data, actions) = self.handle_event(
data, event).await;
...
if let Some(event) = actions.postpone {
mailbox.postpone(Event::External(event));
}
if Self::state_enter() && (is_repeat || self != old_state) {
mailbox.enter_new_state();
mailbox.push(Event::Enter(old_state));
old_state = self.clone()
}
}
```
---
## 7. Timeout Action
---
Timeout is Event
---
- Event Timeout: Cleared on any event
- State Timeout: Cleared on entering new state
- Named Timeout: Manual Clearing
---
## Tokio Example 7: State Timeout
Auto Turn Off
[full code](https://github.com/doitian/from-statem-to-tokio/blob/main/examples/07_state_timeout.rs)
---
### Event
```rust
enum Event<S, E> {
External(E),
Enter(S),
StateTimeout,
}
```
---
### Let Mainbox Generates Timeout Event
```rust
struct Mailbox<S, E> {
rx: mpsc::Receiver<Event<S, E>>,
deadline: Option<Instant>,
queue: VecDeque<Event<S, E>>,
postponed: Vec<Event<S, E>>,
}
```
---
```rust
match self.deadline {
Some(deadline) => {
tokio::select! {
event = self.rx.recv() => event,
_ = sleep_until(deadline) => {
self.deadline = None;
Some(Event::StateTimeout)
}
}
}
None => self.rx.recv().await,
}
```
---
```rust
fn enter_new_state(&mut self) {
// Auto clear on entering new state
self.deadline = None;
self.queue.extend(self.postponed.drain(..));
}
```
---
## 8. Inserted Events Action
---
### Use Cases
- Preprocess
- State Machines Composition
---
#### Use Case: Preprocess
Combine Key Down and Key Up into Key Press Event
---
## Tokio Example 8: Inserted Events
[full code](https://github.com/doitian/from-statem-to-tokio/blob/main/examples/08_inserted_events.rs)
---
### Design
Preprocess `TurnOn` and `Click` and generate internal `TurnOn`, `TurnOff`
---
### Differences
- Erlang allow insert any type of events
- This example only inserts internal events
---
### Event
```rust
enum Event<S, E, I> {
External(E),
Enter(S),
StateTimeout,
Internal(I),
}
```
---
### Callback
```rust
(Switch::On, Event::External(ExternalEvent::Click)) => (
Transition::NewState(self),
data,
ActionsBuilder::new()
.new_event(InternalEvent::TurnOff)
.build(),
),
```
---
```rust
(Switch::Off, Event::External(ExternalEvent::Click)) => (
Transition::NewState(self),
data,
ActionsBuilder::new()
.new_event(InternalEvent::TurnOn)
.build(),
),
```
---
```rust
(Switch::On | Switch::Off, Event::External(TurnOn)) => (
Transition::NewState(self),
data,
ActionsBuilder::new()
.new_event(InternalEvent::TurnOn)
.build(),
),
```
---
### Internal Event Handler
```rust
(_, Event::Internal(InternalEvent::TurnOn)) => (
Transition::RepeatState(Switch::On),
data,
Actions::default(),
),
(_, Event::Internal(InternalEvent::TurnOff)) => {
(Transition::NewState(Switch::Off), data, Actions::default())
}
```
---
### Engine
```rust
for internal_event in actions.new_events {
mailbox.push(Event::Internal(internal_event));
}
```
---
## Recap
---
Behavior = Engine + Callback
---
State(S) x Event(E) -> Actions(A), State(S')
---
Everything is Event
- Enter
- Timeout
---
Useful Actions
- Postpone
- Timeouts
- Inserted Event