-
Notifications
You must be signed in to change notification settings - Fork 115
Expand file tree
/
Copy pathagent_dispatch.py
More file actions
61 lines (47 loc) · 1.75 KB
/
agent_dispatch.py
File metadata and controls
61 lines (47 loc) · 1.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import asyncio
from livekit import api
room_name = "my-room"
agent_name = "test-agent"
"""
This example demonstrates how to have an agent join a room
without using the automatic dispatch. In order to use this
feature, you must have an agent running with `agent_name` set
when defining your WorkerOptions. A dispatch requests the
agent to enter a specific room with optional metadata.
"""
async def create_explicit_dispatch():
lkapi = api.LiveKitAPI()
dispatch = await lkapi.agent_dispatch.create_dispatch(
api.CreateAgentDispatchRequest(
agent_name=agent_name, room=room_name, metadata="my_metadata"
)
)
print("created dispatch", dispatch)
dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=room_name)
print(f"there are {len(dispatches)} dispatches in {room_name}")
await lkapi.aclose()
"""
When agent name is set, the agent will no longer be automatically dispatched
to new rooms. If you want that agent to be dispatched to a new room as soon as
the participant connects, you can set the RoomConfiguration with the agent
definition in the access token.
"""
async def create_token_with_agent_dispatch() -> str:
token = (
api.AccessToken()
.with_identity("my_participant")
.with_grants(api.VideoGrants(room_join=True, room=room_name))
.with_room_config(
api.RoomConfiguration(
agents=[api.RoomAgentDispatch(agent_name="test-agent", metadata="my_metadata")],
),
)
.to_jwt()
)
return token
async def main():
token = await create_token_with_agent_dispatch()
print("created participant token", token)
print("creating explicit dispatch")
await create_explicit_dispatch()
asyncio.run(main())