194 lines
5.1 KiB
Go
194 lines
5.1 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"screenmark/screenmark/agents"
|
|
"screenmark/screenmark/models"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/charmbracelet/log"
|
|
"github.com/google/uuid"
|
|
"github.com/lib/pq"
|
|
)
|
|
|
|
type Notification struct {
|
|
ImageID uuid.UUID
|
|
Status string
|
|
}
|
|
|
|
func ListenNewImageEvents(db *sql.DB, notifier *Notifier[Notification]) {
|
|
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
})
|
|
defer listener.Close()
|
|
|
|
locationModel := models.NewLocationModel(db)
|
|
eventModel := models.NewEventModel(db)
|
|
noteModel := models.NewNoteModel(db)
|
|
imageModel := models.NewImageModel(db)
|
|
contactModel := models.NewContactModel(db)
|
|
|
|
databaseEventLog := createLogger("Database Events 🤖", os.Stdout)
|
|
databaseEventLog.SetLevel(log.DebugLevel)
|
|
|
|
err := listener.Listen("new_image")
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case parameters := <-listener.Notify:
|
|
imageId := uuid.MustParse(parameters.Extra)
|
|
|
|
databaseEventLog.Debug("Starting processing image", "ImageID", imageId)
|
|
|
|
ctx := context.Background()
|
|
|
|
go func() {
|
|
image, err := imageModel.GetToProcessWithData(ctx, imageId)
|
|
if err != nil {
|
|
databaseEventLog.Error("Failed to GetToProcessWithData", "error", err)
|
|
return
|
|
}
|
|
|
|
splitWriter := createDbStdoutWriter(db, image.ImageID)
|
|
|
|
noteAgent := agents.NewNoteAgent(createLogger("Notes 📝", splitWriter), noteModel)
|
|
contactAgent := agents.NewContactAgent(createLogger("Contacts 👥", splitWriter), contactModel)
|
|
locationAgent := agents.NewLocationAgent(createLogger("Locations 📍", splitWriter), locationModel)
|
|
eventAgent := agents.NewEventAgent(createLogger("Events 📅", splitWriter), eventModel, locationModel)
|
|
|
|
if err := imageModel.StartProcessing(ctx, image.ID); err != nil {
|
|
databaseEventLog.Error("Failed to FinishProcessing", "error", err)
|
|
return
|
|
}
|
|
|
|
orchestrator := agents.NewOrchestratorAgent(createLogger("Orchestrator 🎼", splitWriter), noteAgent, contactAgent, locationAgent, eventAgent, image.Image.ImageName, image.Image.Image)
|
|
orchestrator.RunAgent(image.UserID, image.ImageID, image.Image.ImageName, image.Image.Image)
|
|
_, err = imageModel.FinishProcessing(ctx, image.ID)
|
|
if err != nil {
|
|
databaseEventLog.Error("Failed to finish processing", "ImageID", imageId, "error", err)
|
|
return
|
|
}
|
|
|
|
databaseEventLog.Debug("Finished processing image", "ImageID", imageId)
|
|
}()
|
|
}
|
|
}
|
|
}
|
|
|
|
func ListenProcessingImageStatus(db *sql.DB, images models.ImageModel, notifier *Notifier[Notification]) {
|
|
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
})
|
|
defer listener.Close()
|
|
|
|
logger := createLogger("Image Status 📊", os.Stdout)
|
|
|
|
if err := listener.Listen("new_processing_image_status"); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case data := <-listener.Notify:
|
|
imageStringUuid := data.Extra[0:36]
|
|
status := data.Extra[36:]
|
|
|
|
imageUuid, err := uuid.Parse(imageStringUuid)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
continue
|
|
}
|
|
|
|
processingImage, err := images.GetToProcess(context.Background(), imageUuid)
|
|
if err != nil {
|
|
logger.Error("GetToProcess failed", "err", err)
|
|
continue
|
|
}
|
|
|
|
logger.Info("Update", "id", imageStringUuid, "status", status)
|
|
|
|
notification := Notification{
|
|
ImageID: processingImage.ImageID,
|
|
Status: status,
|
|
}
|
|
|
|
if err := notifier.SendAndCreate(processingImage.UserID.String(), notification); err != nil {
|
|
logger.Error(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc {
|
|
counter := 0
|
|
|
|
userSplitters := make(map[string]*ChannelSplitter[Notification])
|
|
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
userId := r.Context().Value(USER_ID).(uuid.UUID)
|
|
if userId == uuid.Nil {
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
// w.(http.Flusher).Flush()
|
|
|
|
if _, exists := notifier.Listeners[userId.String()]; !exists {
|
|
notifier.Create(userId.String())
|
|
}
|
|
|
|
userNotifications := notifier.Listeners[userId.String()]
|
|
|
|
if _, exists := userSplitters[userId.String()]; !exists {
|
|
splitter := NewChannelSplitter(userNotifications)
|
|
|
|
userSplitters[userId.String()] = &splitter
|
|
splitter.Listen()
|
|
}
|
|
|
|
splitter := userSplitters[userId.String()]
|
|
|
|
id := strconv.Itoa(counter)
|
|
counter += 1
|
|
|
|
notifications := splitter.Add(id)
|
|
defer splitter.Remove(id)
|
|
|
|
for {
|
|
select {
|
|
case <-r.Context().Done():
|
|
fmt.Fprint(w, "event: close\ndata: Connection closed\n\n")
|
|
w.(http.Flusher).Flush()
|
|
return
|
|
case msg := <-notifications:
|
|
|
|
msgString, err := json.Marshal(msg)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
fmt.Printf("Sending msg %s\n", msg)
|
|
fmt.Fprintf(w, "event: data\ndata: %s\n\n", string(msgString))
|
|
w.(http.Flusher).Flush()
|
|
}
|
|
}
|
|
}
|
|
}
|