package main import ( "context" "database/sql" "os" "screenmark/screenmark/agents" "screenmark/screenmark/models" "time" "github.com/charmbracelet/log" "github.com/google/uuid" "github.com/lib/pq" ) func ListenNewImageEvents(db *sql.DB, eventManager *EventManager) { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { if err != nil { panic(err) } }) defer listener.Close() locationModel := models.NewLocationModel(db) eventModel := models.NewEventModel(db) noteModel := models.NewNoteModel(db) imageModel := models.NewImageModel(db) contactModel := models.NewContactModel(db) databaseEventLog := createLogger("Database Events 🤖", os.Stdout) databaseEventLog.SetLevel(log.DebugLevel) err := listener.Listen("new_image") if err != nil { panic(err) } for { select { case parameters := <-listener.Notify: imageId := uuid.MustParse(parameters.Extra) eventManager.listeners[parameters.Extra] = make(chan string) databaseEventLog.Debug("Starting processing image", "ImageID", imageId) ctx := context.Background() go func() { image, err := imageModel.GetToProcessWithData(ctx, imageId) if err != nil { databaseEventLog.Error("Failed to GetToProcessWithData", "error", err) return } splitWriter := createDbStdoutWriter(db, image.ImageID) noteAgent := agents.NewNoteAgent(createLogger("Notes 📝", splitWriter), noteModel) contactAgent := agents.NewContactAgent(createLogger("Contacts 👥", splitWriter), contactModel) locationAgent := agents.NewLocationAgent(createLogger("Locations 📍", splitWriter), locationModel) eventAgent := agents.NewEventAgent(createLogger("Events 📅", splitWriter), eventModel, locationModel) if err := imageModel.StartProcessing(ctx, image.ID); err != nil { databaseEventLog.Error("Failed to FinishProcessing", "error", err) return } orchestrator := agents.NewOrchestratorAgent(createLogger("Orchestrator 🎼", splitWriter), noteAgent, contactAgent, locationAgent, eventAgent, image.Image.ImageName, image.Image.Image) err = orchestrator.RunAgent(image.UserID, image.ImageID, image.Image.ImageName, image.Image.Image) if err != nil { databaseEventLog.Error("Orchestrator failed", "error", err) return } _, err = imageModel.FinishProcessing(ctx, image.ID) if err != nil { databaseEventLog.Error("Failed to finish processing", "ImageID", imageId) return } databaseEventLog.Debug("Starting processing image", "ImageID", imageId) }() } } } type EventManager struct { // Maps processing image UUID to a channel listeners map[string]chan string } func NewEventManager() EventManager { return EventManager{ listeners: make(map[string]chan string), } } func ListenProcessingImageStatus(db *sql.DB, eventManager *EventManager) { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { if err != nil { panic(err) } }) defer listener.Close() if err := listener.Listen("new_processing_image_status"); err != nil { panic(err) } for { select { case data := <-listener.Notify: stringUuid := data.Extra[0:36] status := data.Extra[36:] imageListener, exists := eventManager.listeners[stringUuid] if !exists { continue } imageListener <- status close(imageListener) delete(eventManager.listeners, stringUuid) } } }