Haystack/backend/events.go

108 lines
3.1 KiB
Go

package main
import (
"context"
"database/sql"
"os"
"screenmark/screenmark/agents"
"screenmark/screenmark/models"
"time"
"github.com/charmbracelet/log"
"github.com/google/uuid"
"github.com/lib/pq"
)
func ListenNewImageEvents(db *sql.DB, notifier *Notifier[string]) {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil {
panic(err)
}
})
defer listener.Close()
locationModel := models.NewLocationModel(db)
eventModel := models.NewEventModel(db)
noteModel := models.NewNoteModel(db)
imageModel := models.NewImageModel(db)
contactModel := models.NewContactModel(db)
databaseEventLog := createLogger("Database Events 🤖", os.Stdout)
databaseEventLog.SetLevel(log.DebugLevel)
err := listener.Listen("new_image")
if err != nil {
panic(err)
}
for {
select {
case parameters := <-listener.Notify:
imageId := uuid.MustParse(parameters.Extra)
databaseEventLog.Debug("Starting processing image", "ImageID", imageId)
ctx := context.Background()
go func() {
image, err := imageModel.GetToProcessWithData(ctx, imageId)
if err != nil {
databaseEventLog.Error("Failed to GetToProcessWithData", "error", err)
return
}
splitWriter := createDbStdoutWriter(db, image.ImageID)
noteAgent := agents.NewNoteAgent(createLogger("Notes 📝", splitWriter), noteModel)
contactAgent := agents.NewContactAgent(createLogger("Contacts 👥", splitWriter), contactModel)
locationAgent := agents.NewLocationAgent(createLogger("Locations 📍", splitWriter), locationModel)
eventAgent := agents.NewEventAgent(createLogger("Events 📅", splitWriter), eventModel, locationModel)
if err := imageModel.StartProcessing(ctx, image.ID); err != nil {
databaseEventLog.Error("Failed to FinishProcessing", "error", err)
return
}
orchestrator := agents.NewOrchestratorAgent(createLogger("Orchestrator 🎼", splitWriter), noteAgent, contactAgent, locationAgent, eventAgent, image.Image.ImageName, image.Image.Image)
orchestrator.RunAgent(image.UserID, image.ImageID, image.Image.ImageName, image.Image.Image)
_, err = imageModel.FinishProcessing(ctx, image.ID)
if err != nil {
databaseEventLog.Error("Failed to finish processing", "ImageID", imageId, "error", err)
return
}
databaseEventLog.Debug("Finished processing image", "ImageID", imageId)
}()
}
}
}
func ListenProcessingImageStatus(db *sql.DB, notifier *Notifier[string]) {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil {
panic(err)
}
})
defer listener.Close()
logger := createLogger("Image Status 📊", os.Stdout)
if err := listener.Listen("new_processing_image_status"); err != nil {
panic(err)
}
for {
select {
case data := <-listener.Notify:
stringUuid := data.Extra[0:36]
status := data.Extra[36:]
logger.Info("Update", "id", stringUuid, "status", status)
if err := notifier.SendAndCreate(stringUuid, status); err != nil {
logger.Error(err)
}
}
}
}