package main import ( "context" "database/sql" "encoding/json" "fmt" "net/http" "os" "screenmark/screenmark/agents" "screenmark/screenmark/models" "strconv" "time" "github.com/charmbracelet/log" "github.com/google/uuid" "github.com/lib/pq" ) type Notification struct { ImageID uuid.UUID ImageName string Status string } func ListenNewImageEvents(db *sql.DB, notifier *Notifier[Notification]) { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { if err != nil { panic(err) } }) defer listener.Close() imageModel := models.NewImageModel(db) listModel := models.NewListModel(db) databaseEventLog := createLogger("Database Events 🤖", os.Stdout) databaseEventLog.SetLevel(log.DebugLevel) err := listener.Listen("new_image") if err != nil { panic(err) } for parameters := range listener.Notify { imageId := uuid.MustParse(parameters.Extra) databaseEventLog.Debug("Starting processing image", "ImageID", imageId) ctx := context.Background() go func() { image, err := imageModel.GetToProcessWithData(ctx, imageId) if err != nil { databaseEventLog.Error("Failed to GetToProcessWithData", "error", err) return } splitWriter := createDbStdoutWriter(db, image.ImageID) if err := imageModel.StartProcessing(ctx, image.ID); err != nil { databaseEventLog.Error("Failed to FinishProcessing", "error", err) return } descriptionAgent := agents.NewDescriptionAgent(createLogger("Description 📝", splitWriter), imageModel) err = descriptionAgent.Describe(createLogger("Description 📓", splitWriter), image.Image.ID, image.Image.ImageName, image.Image.Image) if err != nil { log.Error(err) } listAgent := agents.NewListAgent(createLogger("Lists 🖋️", splitWriter), listModel) listAgent.RunAgent(image.UserID, image.ImageID, image.Image.ImageName, image.Image.Image) _, err = imageModel.FinishProcessing(ctx, image.ID) if err != nil { databaseEventLog.Error("Failed to finish processing", "ImageID", imageId, "error", err) return } databaseEventLog.Debug("Finished processing image", "ImageID", imageId) }() } } func ListenProcessingImageStatus(db *sql.DB, images models.ImageModel, notifier *Notifier[Notification]) { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { if err != nil { panic(err) } }) defer listener.Close() logger := createLogger("Image Status 📊", os.Stdout) if err := listener.Listen("new_processing_image_status"); err != nil { panic(err) } for data := range listener.Notify { imageStringUuid := data.Extra[0:36] status := data.Extra[36:] imageUuid, err := uuid.Parse(imageStringUuid) if err != nil { logger.Error(err) continue } processingImage, err := images.GetToProcess(context.Background(), imageUuid) if err != nil { logger.Error("GetToProcess failed", "err", err) continue } logger.Info("Update", "id", imageStringUuid, "status", status) notification := Notification{ ImageID: processingImage.ImageID, ImageName: processingImage.Image.ImageName, Status: status, } if err := notifier.SendAndCreate(processingImage.UserID.String(), notification); err != nil { logger.Error(err) } } } /* * TODO: We have channels open every a user sends an image. * We never close these channels. * * What is a reasonable default? Close the channel after 1 minute of inactivity? */ func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc { counter := 0 userSplitters := make(map[string]*ChannelSplitter[Notification]) return func(w http.ResponseWriter, r *http.Request) { _userId := r.Context().Value(USER_ID).(uuid.UUID) if _userId == uuid.Nil { w.WriteHeader(http.StatusUnauthorized) return } userId := _userId.String() w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") // w.(http.Flusher).Flush() if _, exists := notifier.Listeners[userId]; !exists { notifier.Create(userId) } userNotifications := notifier.Listeners[userId] if _, exists := userSplitters[userId]; !exists { splitter := NewChannelSplitter(userNotifications) userSplitters[userId] = &splitter splitter.Listen() } splitter := userSplitters[userId] id := strconv.Itoa(counter) counter += 1 notifications := splitter.Add(id) defer splitter.Remove(id) for { select { case <-r.Context().Done(): fmt.Fprint(w, "event: close\ndata: Connection closed\n\n") w.(http.Flusher).Flush() return case msg := <-notifications: msgString, err := json.Marshal(msg) if err != nil { w.WriteHeader(http.StatusInternalServerError) return } fmt.Printf("Sending msg %s\n", msg) fmt.Fprintf(w, "event: data\ndata: %s\n\n", string(msgString)) w.(http.Flusher).Flush() } } } }