Haystack/backend/events.go
John Costa 0fcdd73a47 feat(sse): very rough events. Not used in the client yet
feat(sse): very rough events. Not used in the client yet
2025-04-13 14:27:59 +01:00

135 lines
3.1 KiB
Go

package main
import (
"context"
"database/sql"
"fmt"
"log"
"os"
"screenmark/screenmark/agents"
"screenmark/screenmark/models"
"time"
"github.com/google/uuid"
"github.com/lib/pq"
)
func ListenNewImageEvents(db *sql.DB, eventManager *EventManager) {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil {
panic(err)
}
})
defer listener.Close()
locationModel := models.NewLocationModel(db)
eventModel := models.NewEventModel(db)
noteModel := models.NewNoteModel(db)
imageModel := models.NewImageModel(db)
contactModel := models.NewContactModel(db)
err := listener.Listen("new_image")
if err != nil {
panic(err)
}
for {
select {
case parameters := <-listener.Notify:
imageId := uuid.MustParse(parameters.Extra)
eventManager.listeners[parameters.Extra] = make(chan string)
ctx := context.Background()
go func() {
locationAgent, err := agents.NewLocationEventAgent(locationModel, eventModel, contactModel)
if err != nil {
panic(err)
}
noteAgent, err := agents.NewNoteAgent(noteModel)
if err != nil {
panic(err)
}
contactAgent, err := agents.NewContactAgent(contactModel)
if err != nil {
panic(err)
}
image, err := imageModel.GetToProcessWithData(ctx, imageId)
if err != nil {
log.Println("Failed to GetToProcessWithData")
log.Println(err)
return
}
if err := imageModel.StartProcessing(ctx, image.ID); err != nil {
log.Println("Failed to FinishProcessing")
log.Println(err)
return
}
orchestrator, err := agents.NewOrchestratorAgent(locationAgent, noteAgent, contactAgent, image.Image.ImageName, image.Image.Image)
if err != nil {
panic(err)
}
// Still need to find some way to hide this complexity away.
// I don't think wrapping agents in structs actually works too well.
err = orchestrator.Client.RunAgent(agents.OrchestratorPrompt, agents.OrchestratorTools, "noAction", image.UserID, image.ImageID, image.Image.ImageName, image.Image.Image)
if err != nil {
log.Println(err)
}
imageModel.FinishProcessing(ctx, image.ID)
}()
}
}
}
type EventManager struct {
// Maps processing image UUID to a channel
listeners map[string]chan string
}
func NewEventManager() EventManager {
return EventManager{
listeners: make(map[string]chan string),
}
}
func ListenProcessingImageStatus(db *sql.DB, eventManager *EventManager) {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil {
panic(err)
}
})
defer listener.Close()
if err := listener.Listen("new_processing_image_status"); err != nil {
panic(err)
}
for {
select {
case data := <-listener.Notify:
stringUuid := data.Extra[0:36]
status := data.Extra[36:]
fmt.Printf("UUID: %s\n", stringUuid)
fmt.Printf("Receiving :s\n", data.Extra)
imageListener, exists := eventManager.listeners[stringUuid]
if !exists {
continue
}
imageListener <- status
close(imageListener)
delete(eventManager.listeners, stringUuid)
}
}
}