Haystack/backend/events.go
John Costa 8b6b9453a8 refactor: creating image process to handle processing of images
Decoupling this from the DB, it's a good step.

Not yet perfect however.
2025-09-14 17:42:16 +01:00

290 lines
7.2 KiB
Go

package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"net/http"
"os"
"screenmark/screenmark/agents"
"screenmark/screenmark/imageprocessor"
"screenmark/screenmark/limits"
"screenmark/screenmark/middleware"
"screenmark/screenmark/models"
"screenmark/screenmark/notifier"
"strconv"
"sync"
"time"
"github.com/charmbracelet/log"
"github.com/google/uuid"
"github.com/lib/pq"
)
func ListenNewImageEvents(db *sql.DB) {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil {
panic(err)
}
})
defer listener.Close()
imageModel := models.NewImageModel(db)
listModel := models.NewListModel(db)
limits := limits.CreateLimitsManager(db)
databaseEventLog := createLogger("Database Events 🤖", os.Stdout)
databaseEventLog.SetLevel(log.DebugLevel)
err := listener.Listen("new_image")
if err != nil {
panic(err)
}
for parameters := range listener.Notify {
imageId := uuid.MustParse(parameters.Extra)
databaseEventLog.Debug("Starting processing image", "ImageID", imageId)
ctx := context.Background()
go func() {
image, err := imageModel.GetToProcessWithData(ctx, imageId)
if err != nil {
databaseEventLog.Error("Failed to GetToProcessWithData", "error", err)
return
}
splitWriter := createDbStdoutWriter(db, image.ImageID)
if err := imageModel.StartProcessing(ctx, image.ID); err != nil {
databaseEventLog.Error("Failed to FinishProcessing", "error", err)
return
}
descriptionAgent := agents.NewDescriptionAgent(createLogger("Description 📝", splitWriter), imageModel)
listAgent := agents.NewListAgent(createLogger("Lists 🖋️", splitWriter), listModel, limits)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
descriptionAgent.Describe(createLogger("Description 📓", splitWriter), image.Image.ID, image.Image.ImageName, image.Image.Image)
}()
go func() {
defer wg.Done()
listAgent.RunAgent(image.UserID, image.ImageID, image.Image.ImageName, image.Image.Image)
}()
wg.Wait()
_, err = imageModel.FinishProcessing(ctx, image.ID)
if err != nil {
databaseEventLog.Error("Failed to finish processing", "ImageID", imageId, "error", err)
return
}
databaseEventLog.Debug("Finished processing image", "ImageID", imageId)
}()
}
}
func getProcessingImageStatusChannel(db *sql.DB) chan string {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil {
panic(err)
}
})
defer listener.Close()
if err := listener.Listen("new_processing_image_status"); err != nil {
panic(err)
}
msgChan := make(chan string)
go func() {
for data := range listener.Notify {
msgChan <- data.Extra
}
}()
return msgChan
}
func ListenNewStackEvents(db *sql.DB) {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil {
panic(err)
}
})
defer listener.Close()
stackModel := models.NewListModel(db)
newStacksLogger := createLogger("New Stacks 🤖", os.Stdout)
newStacksLogger.SetLevel(log.DebugLevel)
err := listener.Listen("new_stack")
if err != nil {
panic(err)
}
for parameters := range listener.Notify {
stackID := uuid.MustParse(parameters.Extra)
newStacksLogger.Debug("Starting processing stack", "StackID", stackID)
ctx := context.Background()
go func() {
stack, err := stackModel.GetProcessing(ctx, stackID)
if err != nil {
newStacksLogger.Error("failed to get processing", "error", err)
return
}
if err := stackModel.StartProcessing(ctx, stackID); err != nil {
newStacksLogger.Error("failed to start processing", "error", err)
return
}
listAgent := agents.NewCreateListAgent(newStacksLogger, stackModel)
userListRequest := fmt.Sprintf("title=%s,fields=%s", stack.Title, stack.Fields)
err = listAgent.CreateList(newStacksLogger, stack.UserID, userListRequest)
if err != nil {
newStacksLogger.Error("running agent", "err", err)
return
}
if err := stackModel.EndProcessing(ctx, stackID); err != nil {
newStacksLogger.Error("failed to finish processing", "error", err)
return
}
newStacksLogger.Debug("Finished processing stack", "StackID", stackID)
}()
}
}
func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *notifier.Notifier[imageprocessor.Notification]) {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil {
panic(err)
}
})
defer listener.Close()
logger := createLogger("Stack Status 📊", os.Stdout)
if err := listener.Listen("new_processing_stack_status"); err != nil {
panic(err)
}
for data := range listener.Notify {
stackStringUUID := data.Extra[0:36]
status := data.Extra[36:]
stackUUID, err := uuid.Parse(stackStringUUID)
if err != nil {
logger.Error(err)
continue
}
processingStack, err := stacks.GetToProcess(context.Background(), stackUUID)
if err != nil {
logger.Error("GetToProcess failed", "err", err)
continue
}
logger.Info("Update", "id", stackStringUUID, "status", status)
notification := imageprocessor.GetListNotification(imageprocessor.ListNotification{
Type: imageprocessor.LIST_TYPE,
Name: processingStack.Title,
ListID: stackUUID,
Status: status,
})
if err := notifier.SendAndCreate(processingStack.UserID.String(), notification); err != nil {
logger.Error(err)
}
}
}
/*
* TODO: We have channels open every a user sends an image.
* We never close these channels.
*
* What is a reasonable default? Close the channel after 1 minute of inactivity?
*/
func CreateEventsHandler(notifierr *notifier.Notifier[imageprocessor.Notification]) http.HandlerFunc {
counter := 0
userSplitters := make(map[string]*notifier.ChannelSplitter[imageprocessor.Notification])
return func(w http.ResponseWriter, r *http.Request) {
_userId := r.Context().Value(middleware.USER_ID).(uuid.UUID)
if _userId == uuid.Nil {
w.WriteHeader(http.StatusUnauthorized)
return
}
userId := _userId.String()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// w.(http.Flusher).Flush()
if _, exists := notifierr.Listeners[userId]; !exists {
notifierr.Create(userId)
}
userNotifications := notifierr.Listeners[userId]
if _, exists := userSplitters[userId]; !exists {
splitter := notifier.NewChannelSplitter(userNotifications)
userSplitters[userId] = &splitter
splitter.Listen()
}
splitter := userSplitters[userId]
id := strconv.Itoa(counter)
counter += 1
notifications := splitter.Add(id)
defer splitter.Remove(id)
for {
select {
case <-r.Context().Done():
fmt.Fprint(w, "event: close\ndata: Connection closed\n\n")
w.(http.Flusher).Flush()
return
case msg := <-notifications:
msgString, err := json.Marshal(msg)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
fmt.Printf("Sending msg %s\n", msgString)
fmt.Fprintf(w, "event: data\ndata: %s\n\n", string(msgString))
w.(http.Flusher).Flush()
}
}
}
}