feat: sending processing images on app load

This commit is contained in:
2025-07-02 12:04:16 +01:00
parent cb4a03015a
commit c632487d7e
3 changed files with 103 additions and 67 deletions

View File

@ -44,45 +44,42 @@ func ListenNewImageEvents(db *sql.DB, notifier *Notifier[Notification]) {
panic(err)
}
for {
select {
case parameters := <-listener.Notify:
imageId := uuid.MustParse(parameters.Extra)
for parameters := range listener.Notify {
imageId := uuid.MustParse(parameters.Extra)
databaseEventLog.Debug("Starting processing image", "ImageID", imageId)
databaseEventLog.Debug("Starting processing image", "ImageID", imageId)
ctx := context.Background()
ctx := context.Background()
go func() {
image, err := imageModel.GetToProcessWithData(ctx, imageId)
if err != nil {
databaseEventLog.Error("Failed to GetToProcessWithData", "error", err)
return
}
go func() {
image, err := imageModel.GetToProcessWithData(ctx, imageId)
if err != nil {
databaseEventLog.Error("Failed to GetToProcessWithData", "error", err)
return
}
splitWriter := createDbStdoutWriter(db, image.ImageID)
splitWriter := createDbStdoutWriter(db, image.ImageID)
noteAgent := agents.NewNoteAgent(createLogger("Notes 📝", splitWriter), noteModel)
contactAgent := agents.NewContactAgent(createLogger("Contacts 👥", splitWriter), contactModel)
locationAgent := agents.NewLocationAgent(createLogger("Locations 📍", splitWriter), locationModel)
eventAgent := agents.NewEventAgent(createLogger("Events 📅", splitWriter), eventModel, locationModel)
noteAgent := agents.NewNoteAgent(createLogger("Notes 📝", splitWriter), noteModel)
contactAgent := agents.NewContactAgent(createLogger("Contacts 👥", splitWriter), contactModel)
locationAgent := agents.NewLocationAgent(createLogger("Locations 📍", splitWriter), locationModel)
eventAgent := agents.NewEventAgent(createLogger("Events 📅", splitWriter), eventModel, locationModel)
if err := imageModel.StartProcessing(ctx, image.ID); err != nil {
databaseEventLog.Error("Failed to FinishProcessing", "error", err)
return
}
if err := imageModel.StartProcessing(ctx, image.ID); err != nil {
databaseEventLog.Error("Failed to FinishProcessing", "error", err)
return
}
orchestrator := agents.NewOrchestratorAgent(createLogger("Orchestrator 🎼", splitWriter), noteAgent, contactAgent, locationAgent, eventAgent, image.Image.ImageName, image.Image.Image)
orchestrator.RunAgent(image.UserID, image.ImageID, image.Image.ImageName, image.Image.Image)
_, err = imageModel.FinishProcessing(ctx, image.ID)
if err != nil {
databaseEventLog.Error("Failed to finish processing", "ImageID", imageId, "error", err)
return
}
orchestrator := agents.NewOrchestratorAgent(createLogger("Orchestrator 🎼", splitWriter), noteAgent, contactAgent, locationAgent, eventAgent, image.Image.ImageName, image.Image.Image)
orchestrator.RunAgent(image.UserID, image.ImageID, image.Image.ImageName, image.Image.Image)
_, err = imageModel.FinishProcessing(ctx, image.ID)
if err != nil {
databaseEventLog.Error("Failed to finish processing", "ImageID", imageId, "error", err)
return
}
databaseEventLog.Debug("Finished processing image", "ImageID", imageId)
}()
}
databaseEventLog.Debug("Finished processing image", "ImageID", imageId)
}()
}
}
@ -100,69 +97,74 @@ func ListenProcessingImageStatus(db *sql.DB, images models.ImageModel, notifier
panic(err)
}
for {
select {
case data := <-listener.Notify:
imageStringUuid := data.Extra[0:36]
status := data.Extra[36:]
for data := range listener.Notify {
imageStringUuid := data.Extra[0:36]
status := data.Extra[36:]
imageUuid, err := uuid.Parse(imageStringUuid)
if err != nil {
logger.Error(err)
continue
}
imageUuid, err := uuid.Parse(imageStringUuid)
if err != nil {
logger.Error(err)
continue
}
processingImage, err := images.GetToProcess(context.Background(), imageUuid)
if err != nil {
logger.Error("GetToProcess failed", "err", err)
continue
}
processingImage, err := images.GetToProcess(context.Background(), imageUuid)
if err != nil {
logger.Error("GetToProcess failed", "err", err)
continue
}
logger.Info("Update", "id", imageStringUuid, "status", status)
logger.Info("Update", "id", imageStringUuid, "status", status)
notification := Notification{
ImageID: processingImage.ImageID,
Status: status,
}
notification := Notification{
ImageID: processingImage.ImageID,
Status: status,
}
if err := notifier.SendAndCreate(processingImage.UserID.String(), notification); err != nil {
logger.Error(err)
}
if err := notifier.SendAndCreate(processingImage.UserID.String(), notification); err != nil {
logger.Error(err)
}
}
}
/*
* TODO: We have channels open every a user sends an image.
* We never close these channels.
*
* What is a reasonable default? Close the channel after 1 minute of inactivity?
*/
func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc {
counter := 0
userSplitters := make(map[string]*ChannelSplitter[Notification])
return func(w http.ResponseWriter, r *http.Request) {
userId := r.Context().Value(USER_ID).(uuid.UUID)
if userId == uuid.Nil {
_userId := r.Context().Value(USER_ID).(uuid.UUID)
if _userId == uuid.Nil {
w.WriteHeader(http.StatusUnauthorized)
return
}
userId := _userId.String()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// w.(http.Flusher).Flush()
if _, exists := notifier.Listeners[userId.String()]; !exists {
notifier.Create(userId.String())
if _, exists := notifier.Listeners[userId]; !exists {
notifier.Create(userId)
}
userNotifications := notifier.Listeners[userId.String()]
userNotifications := notifier.Listeners[userId]
if _, exists := userSplitters[userId.String()]; !exists {
if _, exists := userSplitters[userId]; !exists {
splitter := NewChannelSplitter(userNotifications)
userSplitters[userId.String()] = &splitter
userSplitters[userId] = &splitter
splitter.Listen()
}
splitter := userSplitters[userId.String()]
splitter := userSplitters[userId]
id := strconv.Itoa(counter)
counter += 1

View File

@ -135,14 +135,24 @@ func main() {
return
}
processingImages, err := imageModel.GetProcessing(r.Context(), userId)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusNotFound)
fmt.Fprintf(w, "Something went wrong")
return
}
type ImagesReturn struct {
UserImages []models.UserImageWithImage
ImageProperties []models.TypedProperties
UserImages []models.UserImageWithImage
ImageProperties []models.TypedProperties
ProcessingImages []models.UserProcessingImage
}
imagesReturn := ImagesReturn{
UserImages: images,
ImageProperties: models.GetTypedImageProperties(imageProperties),
UserImages: images,
ImageProperties: models.GetTypedImageProperties(imageProperties),
ProcessingImages: processingImages,
}
jsonImages, err := json.Marshal(imagesReturn)

View File

@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"screenmark/screenmark/.gen/haystack/haystack/enum"
"screenmark/screenmark/.gen/haystack/haystack/model"
. "screenmark/screenmark/.gen/haystack/haystack/table"
@ -29,6 +30,12 @@ type ProcessingImageData struct {
Image model.Image
}
type UserProcessingImage struct {
model.UserImagesToProcess
Image model.Image
}
func (m ImageModel) Process(ctx context.Context, userId uuid.UUID, image model.Image) (model.UserImagesToProcess, error) {
tx, err := m.dbPool.BeginTx(ctx, nil)
if err != nil {
@ -89,7 +96,7 @@ func (m ImageModel) GetToProcessWithData(ctx context.Context, imageId uuid.UUID)
err := stmt.QueryContext(ctx, m.dbPool, &images)
if len(images) != 1 {
return ProcessingImageData{}, errors.New(fmt.Sprintf("Expected 1, got %d\n", len(images)))
return ProcessingImageData{}, fmt.Errorf("Expected 1, got %d\n", len(images))
}
return images[0], err
@ -171,6 +178,23 @@ func (m ImageModel) Get(ctx context.Context, imageId uuid.UUID) (model.Image, er
return image, err
}
func (m ImageModel) GetProcessing(ctx context.Context, userId uuid.UUID) ([]UserProcessingImage, error) {
getProcessingStmt := SELECT(UserImagesToProcess.AllColumns, Image.ID, Image.ImageName).
FROM(
UserImagesToProcess.INNER_JOIN(
Image, Image.ID.EQ(UserImagesToProcess.ImageID),
),
).WHERE(
UserImagesToProcess.UserID.EQ(UUID(userId)).
AND(UserImagesToProcess.Status.NOT_EQ(enum.Progress.Complete)),
)
images := []UserProcessingImage{}
err := getProcessingStmt.QueryContext(ctx, m.dbPool, &images)
return images, err
}
func (m ImageModel) IsUserAuthorized(ctx context.Context, imageId uuid.UUID, userId uuid.UUID) bool {
getImageUserId := UserImages.SELECT(UserImages.UserID).WHERE(UserImages.ImageID.EQ(UUID(imageId)))