package main import ( "context" "database/sql" "encoding/json" "fmt" "net/http" "os" "screenmark/screenmark/agents" "screenmark/screenmark/models" "strconv" "time" "github.com/charmbracelet/log" "github.com/google/uuid" "github.com/lib/pq" ) type Notification struct { ImageID uuid.UUID Status string } func ListenNewImageEvents(db *sql.DB, notifier *Notifier[Notification]) { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { if err != nil { panic(err) } }) defer listener.Close() locationModel := models.NewLocationModel(db) eventModel := models.NewEventModel(db) noteModel := models.NewNoteModel(db) imageModel := models.NewImageModel(db) contactModel := models.NewContactModel(db) databaseEventLog := createLogger("Database Events 🤖", os.Stdout) databaseEventLog.SetLevel(log.DebugLevel) err := listener.Listen("new_image") if err != nil { panic(err) } for { select { case parameters := <-listener.Notify: imageId := uuid.MustParse(parameters.Extra) databaseEventLog.Debug("Starting processing image", "ImageID", imageId) ctx := context.Background() go func() { image, err := imageModel.GetToProcessWithData(ctx, imageId) if err != nil { databaseEventLog.Error("Failed to GetToProcessWithData", "error", err) return } splitWriter := createDbStdoutWriter(db, image.ImageID) noteAgent := agents.NewNoteAgent(createLogger("Notes 📝", splitWriter), noteModel) contactAgent := agents.NewContactAgent(createLogger("Contacts 👥", splitWriter), contactModel) locationAgent := agents.NewLocationAgent(createLogger("Locations 📍", splitWriter), locationModel) eventAgent := agents.NewEventAgent(createLogger("Events 📅", splitWriter), eventModel, locationModel) if err := imageModel.StartProcessing(ctx, image.ID); err != nil { databaseEventLog.Error("Failed to FinishProcessing", "error", err) return } orchestrator := agents.NewOrchestratorAgent(createLogger("Orchestrator 🎼", splitWriter), noteAgent, contactAgent, locationAgent, eventAgent, image.Image.ImageName, image.Image.Image) orchestrator.RunAgent(image.UserID, image.ImageID, image.Image.ImageName, image.Image.Image) _, err = imageModel.FinishProcessing(ctx, image.ID) if err != nil { databaseEventLog.Error("Failed to finish processing", "ImageID", imageId, "error", err) return } databaseEventLog.Debug("Finished processing image", "ImageID", imageId) }() } } } func ListenProcessingImageStatus(db *sql.DB, images models.ImageModel, notifier *Notifier[Notification]) { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { if err != nil { panic(err) } }) defer listener.Close() logger := createLogger("Image Status 📊", os.Stdout) if err := listener.Listen("new_processing_image_status"); err != nil { panic(err) } for { select { case data := <-listener.Notify: imageStringUuid := data.Extra[0:36] status := data.Extra[36:] imageUuid, err := uuid.Parse(imageStringUuid) if err != nil { logger.Error(err) continue } processingImage, err := images.GetToProcess(context.Background(), imageUuid) if err != nil { logger.Error("GetToProcess failed", "err", err) continue } logger.Info("Update", "id", imageStringUuid, "status", status) notification := Notification{ ImageID: processingImage.ImageID, Status: status, } if err := notifier.SendAndCreate(processingImage.UserID.String(), notification); err != nil { logger.Error(err) } } } } func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc { counter := 0 userSplitters := make(map[string]*ChannelSplitter[Notification]) return func(w http.ResponseWriter, r *http.Request) { userId := r.Context().Value(USER_ID).(uuid.UUID) if userId == uuid.Nil { w.WriteHeader(http.StatusUnauthorized) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") // w.(http.Flusher).Flush() if _, exists := notifier.Listeners[userId.String()]; !exists { notifier.Create(userId.String()) } userNotifications := notifier.Listeners[userId.String()] if _, exists := userSplitters[userId.String()]; !exists { splitter := NewChannelSplitter(userNotifications) userSplitters[userId.String()] = &splitter splitter.Listen() } splitter := userSplitters[userId.String()] id := strconv.Itoa(counter) counter += 1 notifications := splitter.Add(id) defer splitter.Remove(id) for { select { case <-r.Context().Done(): fmt.Fprint(w, "event: close\ndata: Connection closed\n\n") w.(http.Flusher).Flush() return case msg := <-notifications: msgString, err := json.Marshal(msg) if err != nil { w.WriteHeader(http.StatusInternalServerError) return } fmt.Printf("Sending msg %s\n", msg) fmt.Fprintf(w, "event: data\ndata: %s\n\n", string(msgString)) w.(http.Flusher).Flush() } } } }