PgFlow.LiveClient is a LiveView-native client for tracking flow and job runs in real-time. It manages PubSub subscriptions, loads run snapshots from the database, and applies incremental updates to %PgFlow.Schema.Run{} structs stored in your socket assigns — no manual PubSub wiring required.
Add the :pubsub option to your PgFlow config:
# config/config.exs
config :my_app, MyApp.PgFlow,
repo: MyApp.Repo,
pubsub: MyApp.PubSub,
flows: [MyApp.Flows.ProcessOrder]This tells PgFlow's supervisor to attach telemetry handlers that broadcast run and task events to your PubSub.
defmodule MyAppWeb.OrderLive do
use MyAppWeb, :live_view
alias PgFlow.LiveClient
def mount(_params, _session, socket) do
{:ok, LiveClient.init(socket, pubsub: MyApp.PubSub)}
end
def handle_event("process", %{"order_id" => order_id}, socket) do
case LiveClient.start_flow(socket, MyApp.Flows.ProcessOrder, %{"order_id" => order_id}) do
{:ok, socket} -> {:noreply, socket}
{:error, _reason, socket} -> {:noreply, socket}
end
end
def handle_info({:pgflow, _, _} = msg, socket) do
{:noreply, LiveClient.handle_info(msg, socket)}
end
def render(assigns) do
~H"""
<button phx-click="process" phx-value-order_id="123">Process Order</button>
<div :if={@flow_run}>
<p>Status: {@flow_run.status}</p>
<div :for={step <- @flow_run.step_states}>
<p>{step.step_slug}: {step.status}</p>
</div>
</div>
"""
end
endInitializes the socket for flow tracking. Call this in mount/3.
LiveClient.init(socket, pubsub: MyApp.PubSub)Sets the default assign (:flow_run) to nil and stores PubSub config in socket private. Safe to call during both connected and disconnected mount.
Options:
| Option | Default | Description |
|---|---|---|
:pubsub |
(required) | Your Phoenix.PubSub module |
:as |
:flow_run |
Assign key for the run |
Starts a flow, subscribes to its PubSub topic, loads the initial snapshot, and assigns the %Run{}.
case LiveClient.start_flow(socket, :process_order, %{"order_id" => 123}) do
{:ok, socket} -> {:noreply, socket}
{:error, reason, socket} -> {:noreply, socket}
endAccepts a flow module, slug atom, or string slug as the second argument.
Options:
| Option | Default | Description |
|---|---|---|
:as |
:flow_run |
Assign key for the run |
Starts a job and subscribes to updates. Naming parity with PgFlow.enqueue/2. Behaves identically to start_flow/4.
case LiveClient.enqueue(socket, MyApp.Jobs.SendEmail, %{"to" => "user@example.com"}) do
{:ok, socket} -> {:noreply, socket}
{:error, reason, socket} -> {:noreply, socket}
endSubscribes to an existing run by ID. Useful for run detail pages where the run was started elsewhere.
socket = LiveClient.subscribe(socket, run_id)Options:
| Option | Default | Description |
|---|---|---|
:as |
:flow_run |
Assign key for the run |
Unsubscribes from a run and resets the assign to nil.
socket = LiveClient.unsubscribe(socket)
socket = LiveClient.unsubscribe(socket, as: :my_run)Applies a PubSub message to the tracked run in assigns. Wire this up in your LiveView:
def handle_info({:pgflow, _, _} = msg, socket) do
{:noreply, LiveClient.handle_info(msg, socket)}
endReturns the socket unchanged if the message is for an untracked run.
Use the :as option to track multiple runs simultaneously under different assign keys:
def mount(_params, _session, socket) do
socket =
socket
|> LiveClient.init(pubsub: MyApp.PubSub, as: :order_run)
|> LiveClient.init(pubsub: MyApp.PubSub, as: :email_run)
{:ok, socket}
end
def handle_event("start_order", params, socket) do
{:ok, socket} = LiveClient.start_flow(socket, :process_order, params, as: :order_run)
{:noreply, socket}
end
def handle_event("send_email", params, socket) do
{:ok, socket} = LiveClient.enqueue(socket, MyApp.Jobs.SendEmail, params, as: :email_run)
{:noreply, socket}
endThen access @order_run and @email_run independently in templates.
start_flow/4, enqueue/4, and subscribe/3 all assign a %PgFlow.Schema.Run{} struct with preloaded step states. The struct is updated in-memory as PubSub events arrive — no additional DB queries after the initial load.
| Field | Type | Description |
|---|---|---|
run_id |
binary_id |
UUID of the run |
flow_slug |
string |
Flow or job slug |
status |
string |
"started", "completed", or "failed" |
input |
map |
Input data passed to the flow |
output |
map |
Final output (set on completion) |
remaining_steps |
integer |
Steps not yet completed |
started_at |
datetime |
When the run started |
completed_at |
datetime |
When the run completed |
failed_at |
datetime |
When the run failed |
step_states |
list |
Preloaded %StepState{} structs |
| Field | Type | Description |
|---|---|---|
step_slug |
string |
Step identifier |
status |
string |
"created", "started", "completed", or "failed" |
remaining_tasks |
integer |
Tasks not yet completed |
output |
map |
Step output (set on completion) |
error_message |
string |
Error message (set on failure) |
started_at |
datetime |
When the step started |
completed_at |
datetime |
When the step completed |
failed_at |
datetime |
When the step failed |
LiveClient handles the following PubSub events automatically:
| Event | Effect |
|---|---|
:task_started |
Updates step status to "started" |
:task_completed |
Updates step status to "completed", sets output |
:task_failed |
Updates step status to "failed", sets error message |
:run_completed |
Updates run status to "completed", sets output |
:run_failed |
Updates run status to "failed" |
Updates are idempotent — status only advances forward (created < started < completed < failed). Out-of-order or duplicate messages are safely ignored.
For pages that display a run started elsewhere (e.g., a run detail page), use subscribe/3:
def mount(%{"run_id" => run_id}, _session, socket) do
socket =
socket
|> LiveClient.init(pubsub: MyApp.PubSub)
|> LiveClient.subscribe(run_id)
{:ok, socket}
end