Haystack/backend/events.go
John Costa fa127c2331 feat: event agent calling location agent about location ID
This is pretty nice. We can now have agents spawn other agents and
actually get super cool functionality from it.

The pattern might be a little fragile.
2025-04-16 14:43:07 +01:00

140 lines
3.2 KiB
Go

package main
import (
"context"
"database/sql"
"fmt"
"log"
"os"
"screenmark/screenmark/agents"
"screenmark/screenmark/models"
"time"
"github.com/google/uuid"
"github.com/lib/pq"
)
func ListenNewImageEvents(db *sql.DB, eventManager *EventManager) {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil {
panic(err)
}
})
defer listener.Close()
locationModel := models.NewLocationModel(db)
eventModel := models.NewEventModel(db)
noteModel := models.NewNoteModel(db)
imageModel := models.NewImageModel(db)
contactModel := models.NewContactModel(db)
err := listener.Listen("new_image")
if err != nil {
panic(err)
}
for {
select {
case parameters := <-listener.Notify:
imageId := uuid.MustParse(parameters.Extra)
eventManager.listeners[parameters.Extra] = make(chan string)
ctx := context.Background()
go func() {
noteAgent, err := agents.NewNoteAgent(noteModel)
if err != nil {
panic(err)
}
contactAgent, err := agents.NewContactAgent(contactModel)
if err != nil {
panic(err)
}
locationAgent, err := agents.NewLocationAgent(locationModel)
if err != nil {
panic(err)
}
eventAgent, err := agents.NewEventAgent(eventModel, locationAgent)
if err != nil {
panic(err)
}
image, err := imageModel.GetToProcessWithData(ctx, imageId)
if err != nil {
log.Println("Failed to GetToProcessWithData")
log.Println(err)
return
}
if err := imageModel.StartProcessing(ctx, image.ID); err != nil {
log.Println("Failed to FinishProcessing")
log.Println(err)
return
}
orchestrator, err := agents.NewOrchestratorAgent(noteAgent, contactAgent, locationAgent, eventAgent, image.Image.ImageName, image.Image.Image)
if err != nil {
panic(err)
}
// Still need to find some way to hide this complexity away.
// I don't think wrapping agents in structs actually works too well.
err = orchestrator.Client.RunAgent(agents.OrchestratorPrompt, agents.OrchestratorTools, "noAction", nil, image.UserID, image.ImageID, image.Image.ImageName, image.Image.Image)
if err != nil {
log.Println(err)
}
imageModel.FinishProcessing(ctx, image.ID)
}()
}
}
}
type EventManager struct {
// Maps processing image UUID to a channel
listeners map[string]chan string
}
func NewEventManager() EventManager {
return EventManager{
listeners: make(map[string]chan string),
}
}
func ListenProcessingImageStatus(db *sql.DB, eventManager *EventManager) {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil {
panic(err)
}
})
defer listener.Close()
if err := listener.Listen("new_processing_image_status"); err != nil {
panic(err)
}
for {
select {
case data := <-listener.Notify:
stringUuid := data.Extra[0:36]
status := data.Extra[36:]
fmt.Printf("UUID: %s\n", stringUuid)
fmt.Printf("Receiving :s\n", data.Extra)
imageListener, exists := eventManager.listeners[stringUuid]
if !exists {
continue
}
imageListener <- status
close(imageListener)
delete(eventManager.listeners, stringUuid)
}
}
}