Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/lk/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ var (
ArgsUsage: "[working-dir]",
},
privateLinkCommands,
simulateCommand,
},
},
}
Expand Down
29 changes: 12 additions & 17 deletions cmd/lk/agent_private_link.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"fmt"
"strconv"

"github.com/livekit/livekit-cli/v2/pkg/util"
lkproto "github.com/livekit/protocol/livekit"
Expand Down Expand Up @@ -87,11 +86,9 @@ var privateLinkCommands = &cli.Command{
},
}

func buildCreatePrivateLinkRequest(name, region string, port uint32, awsEndpoint string) *lkproto.CreatePrivateLinkRequest {
func buildCreatePrivateLinkRequest(name, awsEndpoint string) *lkproto.CreatePrivateLinkRequest {
return &lkproto.CreatePrivateLinkRequest{
Name: name,
Region: region,
Port: port,
Name: name,
Config: &lkproto.CreatePrivateLinkRequest_Aws{
Aws: &lkproto.CreatePrivateLinkRequest_AWSCreateConfig{
Endpoint: awsEndpoint,
Expand All @@ -104,14 +101,14 @@ func privateLinkServiceDNS(name, projectID string) string {
return fmt.Sprintf("%s-%s.plg.svc", name, projectID)
}

func buildPrivateLinkListRows(links []*lkproto.PrivateLink, healthByID map[string]*lkproto.PrivateLinkStatus, healthErrByID map[string]error) [][]string {
func buildPrivateLinkListRows(links []*lkproto.PrivateLink, healthByID map[string]*lkproto.PrivateLinkHealthStatus, healthErrByID map[string]error) [][]string {
var rows [][]string
for _, link := range links {
if link == nil {
continue
}

status := lkproto.PrivateLinkStatus_PRIVATE_LINK_STATUS_UNKNOWN.String()
status := lkproto.PrivateLinkHealthStatus_PRIVATE_LINK_ATTACHMENT_HEALTH_STATUS_UNKNOWN.String()
updatedAt := "-"

if err, ok := healthErrByID[link.PrivateLinkId]; ok && err != nil {
Expand All @@ -127,8 +124,6 @@ func buildPrivateLinkListRows(links []*lkproto.PrivateLink, healthByID map[strin
rows = append(rows, []string{
link.PrivateLinkId,
link.Name,
link.Region,
strconv.FormatUint(uint64(link.Port), 10),
status,
updatedAt,
})
Expand All @@ -144,7 +139,7 @@ func formatPrivateLinkClientError(action string, err error) error {
}

func createPrivateLink(ctx context.Context, cmd *cli.Command) error {
req := buildCreatePrivateLinkRequest(cmd.String("name"), cmd.String("region"), uint32(cmd.Uint("port")), cmd.String("endpoint"))
req := buildCreatePrivateLinkRequest(cmd.String("name"), cmd.String("endpoint"))
resp, err := agentsClient.CreatePrivateLink(ctx, req)
if err != nil {
return formatPrivateLinkClientError("create", err)
Expand Down Expand Up @@ -173,13 +168,13 @@ func listPrivateLinks(ctx context.Context, cmd *cli.Command) error {
return formatPrivateLinkClientError("list", err)
}

healthByID := make(map[string]*lkproto.PrivateLinkStatus, len(resp.Items))
healthByID := make(map[string]*lkproto.PrivateLinkHealthStatus, len(resp.Items))
healthErrByID := make(map[string]error)
for _, link := range resp.Items {
if link == nil || link.PrivateLinkId == "" {
continue
}
health, healthErr := agentsClient.GetPrivateLinkStatus(ctx, &lkproto.GetPrivateLinkStatusRequest{
health, healthErr := agentsClient.GetPrivateLinkHealthStatus(ctx, &lkproto.GetPrivateLinkHealthStatusRequest{
PrivateLinkId: link.PrivateLinkId,
})
if healthErr != nil {
Expand All @@ -193,9 +188,9 @@ func listPrivateLinks(ctx context.Context, cmd *cli.Command) error {

if cmd.Bool("json") {
type privateLinkWithHealth struct {
PrivateLink *lkproto.PrivateLink `json:"private_link"`
Status *lkproto.PrivateLinkStatus `json:"health"`
HealthError string `json:"health_error,omitempty"`
PrivateLink *lkproto.PrivateLink `json:"private_link"`
Status *lkproto.PrivateLinkHealthStatus `json:"health"`
HealthError string `json:"health_error,omitempty"`
}
items := make([]privateLinkWithHealth, 0, len(resp.Items))
for _, link := range resp.Items {
Expand All @@ -221,7 +216,7 @@ func listPrivateLinks(ctx context.Context, cmd *cli.Command) error {
}

rows := buildPrivateLinkListRows(resp.Items, healthByID, healthErrByID)
table := util.CreateTable().Headers("ID", "Name", "Region", "Port", "Health", "Updated At").Rows(rows...)
table := util.CreateTable().Headers("ID", "Name", "Health", "Updated At").Rows(rows...)
fmt.Println(table)
return nil
}
Expand All @@ -245,7 +240,7 @@ func deletePrivateLink(ctx context.Context, cmd *cli.Command) error {

func getPrivateLinkHealthStatus(ctx context.Context, cmd *cli.Command) error {
privateLinkID := cmd.String("id")
resp, err := agentsClient.GetPrivateLinkStatus(ctx, &lkproto.GetPrivateLinkStatusRequest{
resp, err := agentsClient.GetPrivateLinkHealthStatus(ctx, &lkproto.GetPrivateLinkHealthStatusRequest{
PrivateLinkId: privateLinkID,
})
if err != nil {
Expand Down
34 changes: 11 additions & 23 deletions cmd/lk/agent_private_link_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,10 @@ func TestAgentPrivateLinkCommandTree(t *testing.T) {
}

func TestBuildCreatePrivateLinkRequest_HappyPath(t *testing.T) {
req := buildCreatePrivateLinkRequest("orders-db", "us-east-1", 6379, "com.amazonaws.vpce.us-east-1.vpce-svc-abc123")
req := buildCreatePrivateLinkRequest("orders-db", "com.amazonaws.vpce.us-east-1.vpce-svc-abc123")
require.NotNil(t, req)

assert.Equal(t, "orders-db", req.Name)
assert.Equal(t, "us-east-1", req.Region)
assert.Equal(t, uint32(6379), req.Port)

aws := req.GetAws()
require.NotNil(t, aws)
Expand All @@ -62,7 +60,7 @@ func TestPrivateLinkServiceDNS(t *testing.T) {
}

func TestBuildPrivateLinkListRows_EmptyList(t *testing.T) {
rows := buildPrivateLinkListRows([]*lkproto.PrivateLink{}, map[string]*lkproto.PrivateLinkStatus{}, map[string]error{})
rows := buildPrivateLinkListRows([]*lkproto.PrivateLink{}, map[string]*lkproto.PrivateLinkHealthStatus{}, map[string]error{})
assert.Empty(t, rows)
}

Expand All @@ -71,15 +69,13 @@ func TestBuildPrivateLinkListRows_OnePrivateLink(t *testing.T) {
{
PrivateLinkId: "pl-1",
Name: "orders-db",
Region: "us-east-1",
Port: 6379,
},
}

now := time.Now().UTC()
healthByID := map[string]*lkproto.PrivateLinkStatus{
healthByID := map[string]*lkproto.PrivateLinkHealthStatus{
"pl-1": {
Status: lkproto.PrivateLinkStatus_PRIVATE_LINK_STATUS_AVAILABLE,
Status: lkproto.PrivateLinkHealthStatus_PRIVATE_LINK_ATTACHMENT_HEALTH_STATUS_HEALTHY,
UpdatedAt: timestamppb.New(now),
},
}
Expand All @@ -88,41 +84,33 @@ func TestBuildPrivateLinkListRows_OnePrivateLink(t *testing.T) {
require.Len(t, rows, 1)
assert.Equal(t, "pl-1", rows[0][0])
assert.Equal(t, "orders-db", rows[0][1])
assert.Equal(t, "us-east-1", rows[0][2])
assert.Equal(t, "6379", rows[0][3])
assert.Equal(t, lkproto.PrivateLinkStatus_PRIVATE_LINK_STATUS_AVAILABLE.String(), rows[0][4])
assert.Equal(t, lkproto.PrivateLinkHealthStatus_PRIVATE_LINK_ATTACHMENT_HEALTH_STATUS_HEALTHY.String(), rows[0][2])
}

func TestBuildPrivateLinkListRows_TwoPrivateLinksDifferentRegions(t *testing.T) {
func TestBuildPrivateLinkListRows_TwoPrivateLinks(t *testing.T) {
links := []*lkproto.PrivateLink{
{
PrivateLinkId: "pl-1",
Name: "orders-db",
Region: "us-east-1",
Port: 6379,
},
{
PrivateLinkId: "pl-2",
Name: "cache",
Region: "eu-west-1",
Port: 6380,
},
}

healthByID := map[string]*lkproto.PrivateLinkStatus{
healthByID := map[string]*lkproto.PrivateLinkHealthStatus{
"pl-1": {
Status: lkproto.PrivateLinkStatus_PRIVATE_LINK_STATUS_AVAILABLE,
Status: lkproto.PrivateLinkHealthStatus_PRIVATE_LINK_ATTACHMENT_HEALTH_STATUS_HEALTHY,
},
"pl-2": {
Status: lkproto.PrivateLinkStatus_PRIVATE_LINK_STATUS_AVAILABLE,
Status: lkproto.PrivateLinkHealthStatus_PRIVATE_LINK_ATTACHMENT_HEALTH_STATUS_HEALTHY,
},
}

rows := buildPrivateLinkListRows(links, healthByID, map[string]error{})
require.Len(t, rows, 2)

assert.Equal(t, "us-east-1", rows[0][2])
assert.Equal(t, "eu-west-1", rows[1][2])
assert.Equal(t, lkproto.PrivateLinkStatus_PRIVATE_LINK_STATUS_AVAILABLE.String(), rows[0][4])
assert.Equal(t, lkproto.PrivateLinkStatus_PRIVATE_LINK_STATUS_AVAILABLE.String(), rows[1][4])
assert.Equal(t, lkproto.PrivateLinkHealthStatus_PRIVATE_LINK_ATTACHMENT_HEALTH_STATUS_HEALTHY.String(), rows[0][2])
assert.Equal(t, lkproto.PrivateLinkHealthStatus_PRIVATE_LINK_ATTACHMENT_HEALTH_STATUS_HEALTHY.String(), rows[1][2])
}
145 changes: 108 additions & 37 deletions cmd/lk/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,59 @@ package main
import (
"context"
"fmt"
"io"
"log"
"net"
"os"
"strings"
"time"

tea "github.com/charmbracelet/bubbletea"
"github.com/charmbracelet/lipgloss"
"github.com/urfave/cli/v3"

"github.com/livekit/livekit-cli/v2/pkg/agentfs"
"github.com/livekit/livekit-cli/v2/pkg/console"
"github.com/livekit/livekit-cli/v2/pkg/portaudio"
)

var ConsoleCommands = []*cli.Command{
{
Name: "console",
Usage: "Voice chat with an agent via mic/speakers",
Category: "Core",
Flags: []cli.Flag{
&cli.IntFlag{
Name: "port",
Aliases: []string{"p"},
Usage: "TCP port for agent communication",
Value: 0,
},
&cli.StringFlag{
Name: "input-device",
Usage: "Input device index or name substring",
},
&cli.StringFlag{
Name: "output-device",
Usage: "Output device index or name substring",
},
&cli.BoolFlag{
Name: "list-devices",
Usage: "List available audio devices and exit",
},
&cli.BoolFlag{
Name: "no-aec",
Usage: "Disable acoustic echo cancellation",
},
func init() {
AgentCommands[0].Commands = append(AgentCommands[0].Commands, consoleCommand)
}

var consoleCommand = &cli.Command{
Name: "console",
Usage: "Voice chat with an agent via mic/speakers",
Category: "Core",
Flags: []cli.Flag{
&cli.IntFlag{
Name: "port",
Aliases: []string{"p"},
Usage: "TCP port for agent communication",
Value: 0,
},
&cli.StringFlag{
Name: "input-device",
Usage: "Input device index or name substring",
},
&cli.StringFlag{
Name: "output-device",
Usage: "Output device index or name substring",
},
&cli.BoolFlag{
Name: "list-devices",
Usage: "List available audio devices and exit",
},
&cli.BoolFlag{
Name: "no-aec",
Usage: "Disable acoustic echo cancellation",
},
&cli.StringFlag{
Name: "entrypoint",
Usage: "Agent entrypoint `FILE` (default: auto-detect)",
},
Action: runConsole,
},
Action: runConsole,
}

func runConsole(ctx context.Context, cmd *cli.Command) error {
Expand Down Expand Up @@ -103,19 +114,75 @@ func runConsole(ctx context.Context, cmd *cli.Command) error {
defer server.Close()

actualAddr := server.Addr().String()
fmt.Fprintf(os.Stderr, "Listening on %s\n", actualAddr)
fmt.Fprintf(os.Stderr, "Input: %s\n", inputDev.Name)
fmt.Fprintf(os.Stderr, "Output: %s\n", outputDev.Name)
fmt.Fprintf(os.Stderr, "Waiting for agent connection...\n")

conn, err := server.Accept()
// Detect project type, walking up parent directories if needed.
projectDir, projectType, err := agentfs.DetectProjectRoot(".")
if err != nil {
return err
}
if !projectType.IsPython() {
return fmt.Errorf("console currently only supports Python agents (detected: %s)", projectType)
}

// Resolve entrypoint relative to project root
entrypoint, err := findEntrypoint(projectDir, cmd.String("entrypoint"), projectType)
if err != nil {
return err
}

fmt.Fprintf(os.Stderr, "Starting agent (%s in %s)...\n", entrypoint, projectDir)
agentProc, err := startAgent(AgentStartConfig{
Dir: projectDir,
Entrypoint: entrypoint,
ProjectType: projectType,
CLIArgs: []string{"console", "--connect-addr", actualAddr},
})
if err != nil {
return fmt.Errorf("agent connection: %w", err)
return fmt.Errorf("failed to start agent: %w", err)
}
defer conn.Close()
defer agentProc.Kill()

// Stream agent logs to the TUI
agentProc.LogStream = make(chan string, 128)

fmt.Fprintf(os.Stderr, "Agent connected from %s\n", conn.RemoteAddr())
// Wait for TCP connection, agent crash, timeout, or cancellation
type acceptResult struct {
conn net.Conn
err error
}
acceptCh := make(chan acceptResult, 1)
go func() {
conn, err := server.Accept()
acceptCh <- acceptResult{conn, err}
}()

var conn net.Conn
select {
case res := <-acceptCh:
if res.err != nil {
return fmt.Errorf("agent connection: %w", res.err)
}
conn = res.conn
case err := <-agentProc.Done():
logs := agentProc.RecentLogs(20)
for _, l := range logs {
fmt.Fprintln(os.Stderr, l)
}
if err != nil {
return fmt.Errorf("agent exited before connecting: %w", err)
}
return fmt.Errorf("agent exited before connecting")
case <-time.After(60 * time.Second):
logs := agentProc.RecentLogs(20)
for _, l := range logs {
fmt.Fprintln(os.Stderr, l)
}
return fmt.Errorf("timed out waiting for agent to connect")
case <-ctx.Done():
return ctx.Err()
}
pipeline, err := console.NewPipeline(console.PipelineConfig{
InputDevice: inputDev,
OutputDevice: outputDev,
Expand All @@ -133,8 +200,11 @@ func runConsole(ctx context.Context, cmd *cli.Command) error {
pipeline.Start(pipelineCtx)
}()

model := newConsoleModel(pipeline, actualAddr, inputDev, outputDev)
p := tea.NewProgram(model, tea.WithAltScreen())
// Redirect Go's default logger to discard so it doesn't corrupt the TUI
log.SetOutput(io.Discard)

model := newConsoleModel(pipeline, agentProc, inputDev.Name, outputDev.Name)
p := tea.NewProgram(model)

if _, err := p.Run(); err != nil {
return err
Expand Down Expand Up @@ -184,3 +254,4 @@ func listDevices() error {

return nil
}

Loading