This commit is contained in:
2025-04-13 16:20:32 +02:00
10 changed files with 197 additions and 6 deletions

View File

@ -0,0 +1,18 @@
//
// Code generated by go-jet DO NOT EDIT.
//
// WARNING: Changes to this file may cause incorrect behavior
// and will be lost if the code is regenerated
//
package enum
import "github.com/go-jet/jet/v2/postgres"
var Progress = &struct {
NotStarted postgres.StringExpression
InProgress postgres.StringExpression
}{
NotStarted: postgres.NewEnumValue("not-started"),
InProgress: postgres.NewEnumValue("in-progress"),
}

View File

@ -0,0 +1,49 @@
//
// Code generated by go-jet DO NOT EDIT.
//
// WARNING: Changes to this file may cause incorrect behavior
// and will be lost if the code is regenerated
//
package model
import "errors"
type Progress string
const (
Progress_NotStarted Progress = "not-started"
Progress_InProgress Progress = "in-progress"
)
var ProgressAllValues = []Progress{
Progress_NotStarted,
Progress_InProgress,
}
func (e *Progress) Scan(value interface{}) error {
var enumValue string
switch val := value.(type) {
case string:
enumValue = val
case []byte:
enumValue = string(val)
default:
return errors.New("jet: Invalid scan value for AllTypesEnum enum. Enum value has to be of type string or []byte")
}
switch enumValue {
case "not-started":
*e = Progress_NotStarted
case "in-progress":
*e = Progress_InProgress
default:
return errors.New("jet: Invalid scan value '" + enumValue + "' for Progress enum")
}
return nil
}
func (e Progress) String() string {
return string(e)
}

View File

@ -13,6 +13,7 @@ import (
type UserImagesToProcess struct {
ID uuid.UUID `sql:"primary_key"`
Status Progress
ImageID uuid.UUID
UserID uuid.UUID
}

View File

@ -18,6 +18,7 @@ type userImagesToProcessTable struct {
// Columns
ID postgres.ColumnString
Status postgres.ColumnString
ImageID postgres.ColumnString
UserID postgres.ColumnString
@ -61,10 +62,11 @@ func newUserImagesToProcessTable(schemaName, tableName, alias string) *UserImage
func newUserImagesToProcessTableImpl(schemaName, tableName, alias string) userImagesToProcessTable {
var (
IDColumn = postgres.StringColumn("id")
StatusColumn = postgres.StringColumn("status")
ImageIDColumn = postgres.StringColumn("image_id")
UserIDColumn = postgres.StringColumn("user_id")
allColumns = postgres.ColumnList{IDColumn, ImageIDColumn, UserIDColumn}
mutableColumns = postgres.ColumnList{ImageIDColumn, UserIDColumn}
allColumns = postgres.ColumnList{IDColumn, StatusColumn, ImageIDColumn, UserIDColumn}
mutableColumns = postgres.ColumnList{StatusColumn, ImageIDColumn, UserIDColumn}
)
return userImagesToProcessTable{
@ -72,6 +74,7 @@ func newUserImagesToProcessTableImpl(schemaName, tableName, alias string) userIm
//Columns
ID: IDColumn,
Status: StatusColumn,
ImageID: ImageIDColumn,
UserID: UserIDColumn,

View File

@ -142,6 +142,7 @@ func (client AgentClient) Request(req *AgentRequestBody) (AgentResponse, error)
}
if len(agentResponse.Choices) != 1 {
client.Log.Errorf("Received more than 1 choice from AI \n %s\n", string(response))
return AgentResponse{}, errors.New("Unsupported. We currently only accept 1 choice from AI.")
}

View File

@ -3,6 +3,7 @@ package main
import (
"context"
"database/sql"
"fmt"
"log"
"os"
"screenmark/screenmark/agents"
@ -13,7 +14,7 @@ import (
"github.com/lib/pq"
)
func ListenNewImageEvents(db *sql.DB) {
func ListenNewImageEvents(db *sql.DB, eventManager *EventManager) {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil {
panic(err)
@ -36,6 +37,7 @@ func ListenNewImageEvents(db *sql.DB) {
select {
case parameters := <-listener.Notify:
imageId := uuid.MustParse(parameters.Extra)
eventManager.listeners[parameters.Extra] = make(chan string)
ctx := context.Background()
@ -62,8 +64,7 @@ func ListenNewImageEvents(db *sql.DB) {
return
}
_, err = imageModel.FinishProcessing(ctx, image.ID)
if err != nil {
if err := imageModel.StartProcessing(ctx, image.ID); err != nil {
log.Println("Failed to FinishProcessing")
log.Println(err)
return
@ -80,7 +81,54 @@ func ListenNewImageEvents(db *sql.DB) {
if err != nil {
log.Println(err)
}
imageModel.FinishProcessing(ctx, image.ID)
}()
}
}
}
type EventManager struct {
// Maps processing image UUID to a channel
listeners map[string]chan string
}
func NewEventManager() EventManager {
return EventManager{
listeners: make(map[string]chan string),
}
}
func ListenProcessingImageStatus(db *sql.DB, eventManager *EventManager) {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil {
panic(err)
}
})
defer listener.Close()
if err := listener.Listen("new_processing_image_status"); err != nil {
panic(err)
}
for {
select {
case data := <-listener.Notify:
stringUuid := data.Extra[0:36]
status := data.Extra[36:]
fmt.Printf("UUID: %s\n", stringUuid)
fmt.Printf("Receiving :s\n", data.Extra)
imageListener, exists := eventManager.listeners[stringUuid]
if !exists {
continue
}
imageListener <- status
close(imageListener)
delete(eventManager.listeners, stringUuid)
}
}
}

View File

@ -2,6 +2,7 @@ package main
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
@ -12,6 +13,7 @@ import (
"screenmark/screenmark/.gen/haystack/haystack/model"
"screenmark/screenmark/agents/client"
"screenmark/screenmark/models"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
@ -48,7 +50,10 @@ func main() {
auth := CreateAuth(mail)
go ListenNewImageEvents(db)
eventManager := NewEventManager()
go ListenNewImageEvents(db, &eventManager)
go ListenProcessingImageStatus(db, &eventManager)
r := chi.NewRouter()
@ -246,6 +251,41 @@ func main() {
})
r.Get("/image-events/{id}", func(w http.ResponseWriter, r *http.Request) {
// TODO: authentication :)
id := r.PathValue("id")
imageNotifier, exists := eventManager.listeners[id]
if !exists {
fmt.Println("Not found!")
w.WriteHeader(http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.(http.Flusher).Flush()
ctx, cancel := context.WithCancel(r.Context())
for {
select {
case <-ctx.Done():
fmt.Fprint(w, "event: close\ndata: Connection closed\n\n")
w.(http.Flusher).Flush()
cancel()
return
case data := <-imageNotifier:
fmt.Printf("Status received: %s\n", data)
fmt.Fprintf(w, "data: %s-%s\n", data, time.Now().String())
w.(http.Flusher).Flush()
cancel()
}
}
})
r.Post("/login", func(w http.ResponseWriter, r *http.Request) {
type LoginBody struct {
Email string `json:"email"`

View File

@ -130,6 +130,17 @@ func (m ImageModel) FinishProcessing(ctx context.Context, imageId uuid.UUID) (mo
return userImage, err
}
func (m ImageModel) StartProcessing(ctx context.Context, processingImageId uuid.UUID) error {
startProcessingStmt := UserImagesToProcess.
UPDATE(UserImagesToProcess.Status).
SET(model.Progress_InProgress).
WHERE(UserImagesToProcess.ID.EQ(UUID(processingImageId)))
_, err := startProcessingStmt.ExecContext(ctx, m.dbPool)
return err
}
func (m ImageModel) Get(ctx context.Context, imageId uuid.UUID) (ImageData, error) {
getImageStmt := SELECT(UserImages.AllColumns, Image.AllColumns).
FROM(

View File

@ -2,6 +2,10 @@ DROP SCHEMA IF EXISTS haystack CASCADE;
CREATE SCHEMA haystack;
/* -----| Enums |----- */
CREATE TYPE haystack.progress AS ENUM('not-started','in-progress');
/* -----| Schema tables |----- */
CREATE TABLE haystack.users (
@ -17,6 +21,7 @@ CREATE TABLE haystack.image (
CREATE TABLE haystack.user_images_to_process (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
status haystack.progress NOT NULL DEFAULT 'not-started',
image_id uuid NOT NULL UNIQUE REFERENCES haystack.image (id),
user_id uuid NOT NULL REFERENCES haystack.users (id)
);
@ -155,6 +160,14 @@ BEGIN
END
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION notify_new_processing_image_status()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('new_processing_image_status', NEW.id::text || NEW.status::text);
RETURN NEW;
END
$$ LANGUAGE plpgsql;
/* -----| Triggers |----- */
CREATE OR REPLACE TRIGGER on_new_image AFTER INSERT
@ -162,6 +175,12 @@ ON haystack.user_images_to_process
FOR EACH ROW
EXECUTE PROCEDURE notify_new_image();
CREATE OR REPLACE TRIGGER on_update_image_progress
AFTER UPDATE OF status
ON haystack.user_images_to_process
FOR EACH ROW
EXECUTE PROCEDURE notify_new_processing_image_status();
/* -----| Test Data |----- */
-- Insert a user

View File

@ -41,6 +41,7 @@ const sendImageResponseValidator = strictObject({
ID: pipe(string(), uuid()),
ImageID: pipe(string(), uuid()),
UserID: pipe(string(), uuid()),
Status: string(),
});
export const sendImage = async (