Skip to content

Commit 22b1c87

Browse files
committed
feat: source selector
why yes i have basically implemented a service discover mechanism
1 parent 42af1d3 commit 22b1c87

File tree

16 files changed

+364
-39
lines changed

16 files changed

+364
-39
lines changed

ansible/roles/liquidsoap/templates/[email protected]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ Description=Liquidsoap HLS streamer for stream %i
44

55
[Service]
66
Type=simple
7-
ExecStart=/usr/bin/liquidsoap /usr/local/libexec/liquidsoap/streamer.liq
7+
ExecStart=/usr/local/bin/liq-streamer.sh
88
Environment=STREAM_ID=%i
99
EnvironmentFile=/etc/streaming.env
1010
User=streaming

liq/streamer.liq

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ end
1717
stream_id = get_env("STREAM_ID")
1818
api_base = get_env("STREAM_API_BASE")
1919
api_token = get_env("STREAM_API_TOKEN")
20+
control_port = int_of_string(get_env("PORT"))
2021

2122
def notify_ready() =
2223
# the endpoint doesn't need any request body values, but liquidsoap json.stringify serialises {} as an empty array
@@ -34,6 +35,52 @@ end
3435

3536
in = input.jack(id="streamer_#{stream_id}")
3637

38+
source = ref(2)
39+
40+
starting = amplify(0.5, single("/opt/radio-tx/starting.mp3"))
41+
ended = amplify(0.5, single("/opt/radio-tx/ended.mp3"))
42+
techdiff = amplify(0.5, single("/opt/radio-tx/techdiff.mp3"))
43+
44+
def handle_set_source(request, response) =
45+
response.status_code(200)
46+
new_source = int_of_string(request.query["source"])
47+
if (new_source >= 1 and new_source <= 4 and new_source != source()) then
48+
source.set(new_source)
49+
if (new_source == 2) then
50+
starting.skip()
51+
elsif (new_source == 3) then
52+
ended.skip()
53+
elsif (new_source == 4) then
54+
techdiff.skip()
55+
end
56+
response.json({source=source()})
57+
else
58+
response.json({source=source()})
59+
end
60+
end
61+
62+
def handle_get_source(request, response) =
63+
response.status_code(200)
64+
response.json({source=source()})
65+
end
66+
67+
harbor.http.register(port=control_port, method="POST", "/source/:source", handle_set_source)
68+
harbor.http.register(port=control_port, method="GET", "/source", handle_get_source)
69+
70+
stream = switch(
71+
[
72+
({source() == 1}, in),
73+
({source() == 2}, starting),
74+
({source() == 3}, ended),
75+
({source() == 4}, techdiff),
76+
({true}, techdiff),
77+
],
78+
track_sensitive=false
79+
# transitions=[
80+
# do_nice_fade, do_nice_fade, do_nice_fade, do_nice_fade, do_nice_fade
81+
# ]
82+
)
83+
3784
aac_high =
3885
%ffmpeg(
3986
format = "mpegts",
@@ -96,5 +143,5 @@ output.file.hls(
96143
persist_at="./hls.state",
97144
"/var/www/hls/#{stream_id}/",
98145
streams,
99-
in
146+
stream
100147
)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
Warnings:
3+
4+
- A unique constraint covering the columns `[controlPort]` on the table `Stream` will be added. If there are existing duplicate values, this will fail.
5+
6+
*/
7+
-- AlterTable
8+
ALTER TABLE "Stream" ADD COLUMN "controlPort" INTEGER;
9+
10+
-- CreateIndex
11+
CREATE UNIQUE INDEX "Stream_controlPort_key" ON "Stream"("controlPort");

prisma/schema.prisma

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ model Stream {
2929
3030
ingestPoint IngestPoint? @relation(fields: [ingestPointId], references: [id])
3131
ingestPointId String?
32+
controlPort Int? @unique
3233
3334
HlsSegment HlsSegment[]
3435
}

scripts/liq-streamer.sh

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#!/usr/bin/env bash
2+
3+
set -eux -o pipefail
4+
5+
config=$(curl --silent --fail --request POST -H 'Content-Type: application/json' --data '{}' -H "Authorization: Bearer $STREAM_API_TOKEN" "$STREAM_API_BASE/api/stream/$STREAM_ID/prepare")
6+
7+
PORT=$(echo "$config" | jq --raw-output .port)
8+
9+
export PORT
10+
11+
exec /usr/bin/liquidsoap "/usr/local/libexec/liquidsoap/streamer.liq"
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { prisma } from '@/lib/db';
2+
import { env } from '@/lib/env';
3+
import { postHandler } from '@/lib/handlers';
4+
import { badRequest } from '@/lib/responses';
5+
import { NextResponse } from 'next/server';
6+
import { z } from 'zod';
7+
8+
const readySchema = z.object({});
9+
10+
export const POST = postHandler(
11+
readySchema,
12+
async ({ id }: { id: string }) => {
13+
const stream = await prisma.stream.findUniqueOrThrow({
14+
where: { fixtureId: id },
15+
});
16+
if (stream.state === 'Complete') {
17+
return badRequest();
18+
}
19+
if (!stream.ingestPointId) {
20+
return badRequest();
21+
}
22+
23+
// TODO: mark next segment as discontinuous if stream is already marked as live
24+
25+
let port: number;
26+
while (true) {
27+
port = 13500 + Math.floor(Math.random() * 1000);
28+
const portExists = await prisma.stream.count({
29+
where: {
30+
controlPort: port,
31+
},
32+
});
33+
if (portExists === 0) {
34+
break;
35+
}
36+
}
37+
38+
await prisma.stream.update({
39+
where: { fixtureId: id },
40+
data: {
41+
controlPort: port,
42+
},
43+
});
44+
45+
return NextResponse.json({
46+
port,
47+
});
48+
},
49+
{
50+
requireAuthentication: { token: env.STREAM_CONTROLLER_TOKEN! },
51+
}
52+
);

src/app/dashboard/streams/[id]/actions.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
'use server';
22

33
import { action } from '@/lib/forms';
4-
import { startStreamSchema } from './schema';
4+
import { setSourceSchema, startStreamSchema } from './schema';
55
import { prisma } from '@/lib/db';
66
import { SafeError } from '@/lib/form-types';
7-
import { notifyStreamStart, notifyStreamStop } from '@/lib/stream-controller';
7+
import {
8+
notifyStreamStart,
9+
notifyStreamStop,
10+
setStreamSource,
11+
} from '@/lib/stream-controller';
812

913
export const startStream = action(startStreamSchema, async ({ id }) => {
1014
const stream = await prisma.stream.findUniqueOrThrow({
@@ -33,3 +37,13 @@ export const endStream = action(startStreamSchema, async ({ id }) => {
3337

3438
await notifyStreamStop(stream.fixtureId);
3539
});
40+
41+
export const setSource = action(setSourceSchema, async ({ id, source }) => {
42+
const stream = await prisma.stream.findUniqueOrThrow({
43+
where: { fixtureId: id },
44+
});
45+
46+
await setStreamSource(stream, source);
47+
48+
return source;
49+
});

src/app/dashboard/streams/[id]/components.tsx

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
'use client';
22

33
import { startTransition, useActionState } from 'react';
4-
import { endStream, startStream } from './actions';
4+
import { endStream, setSource, startStream } from './actions';
55
import { notifications } from '@mantine/notifications';
6-
import { resultMessage } from '@/lib/form-types';
7-
import { Button, Stack, Text } from '@mantine/core';
6+
import { Result, resultMessage } from '@/lib/form-types';
7+
import { Button, ButtonGroup, Stack, Text } from '@mantine/core';
88

99
export function GoLiveButton({ id }: { id: string }) {
1010
const [state, action, pending] = useActionState(
@@ -65,3 +65,88 @@ export function EndStreamButton({ id }: { id: string }) {
6565
</Stack>
6666
);
6767
}
68+
69+
export function SourceSelector({
70+
id,
71+
currentSource: initialSource,
72+
}: {
73+
id: string;
74+
currentSource: number;
75+
}) {
76+
const [state, action, pending] = useActionState<Result<number>, number>(
77+
async (_, source) => {
78+
const res = await setSource({ id, source });
79+
return res;
80+
},
81+
{ ok: true, data: initialSource }
82+
);
83+
84+
const currentSource = state.ok ? state.data : -1;
85+
86+
return (
87+
<Stack>
88+
<Text aria-live="polite">{resultMessage(state)}</Text>
89+
<ButtonGroup>
90+
<SourceSelectButton
91+
id={id}
92+
currentSource={currentSource}
93+
pending={pending}
94+
action={action}
95+
source={1}
96+
label="PGM"
97+
/>
98+
<SourceSelectButton
99+
id={id}
100+
currentSource={currentSource}
101+
pending={pending}
102+
action={action}
103+
source={2}
104+
label="SRT"
105+
/>
106+
<SourceSelectButton
107+
id={id}
108+
currentSource={currentSource}
109+
pending={pending}
110+
action={action}
111+
source={3}
112+
label="END"
113+
/>
114+
<SourceSelectButton
115+
id={id}
116+
currentSource={currentSource}
117+
pending={pending}
118+
action={action}
119+
source={4}
120+
label="TDF"
121+
/>
122+
</ButtonGroup>
123+
</Stack>
124+
);
125+
}
126+
127+
export function SourceSelectButton({
128+
source,
129+
label,
130+
pending,
131+
currentSource,
132+
action,
133+
}: {
134+
id: string;
135+
source: number;
136+
label: string;
137+
pending: boolean;
138+
currentSource: number;
139+
action: (payload: number) => void;
140+
}) {
141+
return (
142+
<Button
143+
loading={pending}
144+
onClick={() => {
145+
startTransition(() => action(source));
146+
}}
147+
variant={currentSource === source ? 'filled' : 'default'}
148+
>
149+
{label}
150+
</Button>
151+
);
152+
}

src/app/dashboard/streams/[id]/page.tsx

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ import StreamPlayer from '@/components/StreamPlayer';
22
import { prisma } from '@/lib/db';
33
import { Button, Group, Title } from '@mantine/core';
44
import { notFound } from 'next/navigation';
5-
import { EndStreamButton, GoLiveButton } from './components';
5+
import { EndStreamButton, GoLiveButton, SourceSelector } from './components';
66
import Link from 'next/link';
7+
import { getStreamSource } from '@/lib/stream-controller';
78

89
export default async function StreamPage({
910
params,
@@ -21,6 +22,11 @@ export default async function StreamPage({
2122
return notFound();
2223
}
2324

25+
let currentSource: number | null = null;
26+
if (stream.state === 'Live') {
27+
currentSource = (await getStreamSource(stream)).source;
28+
}
29+
2430
return (
2531
<>
2632
<Group>
@@ -51,6 +57,11 @@ export default async function StreamPage({
5157
{stream.state === 'Live' && (
5258
<Group>
5359
<EndStreamButton id={stream.fixtureId} />
60+
61+
<SourceSelector
62+
id={stream.fixtureId}
63+
currentSource={currentSource!}
64+
/>
5465
</Group>
5566
)}
5667
</>

src/app/dashboard/streams/[id]/schema.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,8 @@ import { z } from 'zod';
33
export const startStreamSchema = z.object({
44
id: z.string(),
55
});
6+
7+
export const setSourceSchema = z.object({
8+
id: z.string(),
9+
source: z.number().refine((v) => v >= 1 && v <= 4),
10+
});

0 commit comments

Comments
 (0)