FIXUP wip: notifications on starting progress

This commit is contained in:
2025-04-12 15:55:58 +01:00
parent cf7d5e0305
commit 91b9e5402e
8 changed files with 131 additions and 4 deletions

View File

@ -0,0 +1,18 @@
//
// Code generated by go-jet DO NOT EDIT.
//
// WARNING: Changes to this file may cause incorrect behavior
// and will be lost if the code is regenerated
//
package enum
import "github.com/go-jet/jet/v2/postgres"
var Progress = &struct {
NotStarted postgres.StringExpression
InProgress postgres.StringExpression
}{
NotStarted: postgres.NewEnumValue("not-started"),
InProgress: postgres.NewEnumValue("in-progress"),
}

View File

@ -0,0 +1,49 @@
//
// Code generated by go-jet DO NOT EDIT.
//
// WARNING: Changes to this file may cause incorrect behavior
// and will be lost if the code is regenerated
//
package model
import "errors"
type Progress string
const (
Progress_NotStarted Progress = "not-started"
Progress_InProgress Progress = "in-progress"
)
var ProgressAllValues = []Progress{
Progress_NotStarted,
Progress_InProgress,
}
func (e *Progress) Scan(value interface{}) error {
var enumValue string
switch val := value.(type) {
case string:
enumValue = val
case []byte:
enumValue = string(val)
default:
return errors.New("jet: Invalid scan value for AllTypesEnum enum. Enum value has to be of type string or []byte")
}
switch enumValue {
case "not-started":
*e = Progress_NotStarted
case "in-progress":
*e = Progress_InProgress
default:
return errors.New("jet: Invalid scan value '" + enumValue + "' for Progress enum")
}
return nil
}
func (e Progress) String() string {
return string(e)
}

View File

@ -13,6 +13,7 @@ import (
type UserImagesToProcess struct {
ID uuid.UUID `sql:"primary_key"`
Status Progress
ImageID uuid.UUID
UserID uuid.UUID
}

View File

@ -18,6 +18,7 @@ type userImagesToProcessTable struct {
// Columns
ID postgres.ColumnString
Status postgres.ColumnString
ImageID postgres.ColumnString
UserID postgres.ColumnString
@ -61,10 +62,11 @@ func newUserImagesToProcessTable(schemaName, tableName, alias string) *UserImage
func newUserImagesToProcessTableImpl(schemaName, tableName, alias string) userImagesToProcessTable {
var (
IDColumn = postgres.StringColumn("id")
StatusColumn = postgres.StringColumn("status")
ImageIDColumn = postgres.StringColumn("image_id")
UserIDColumn = postgres.StringColumn("user_id")
allColumns = postgres.ColumnList{IDColumn, ImageIDColumn, UserIDColumn}
mutableColumns = postgres.ColumnList{ImageIDColumn, UserIDColumn}
allColumns = postgres.ColumnList{IDColumn, StatusColumn, ImageIDColumn, UserIDColumn}
mutableColumns = postgres.ColumnList{StatusColumn, ImageIDColumn, UserIDColumn}
)
return userImagesToProcessTable{
@ -72,6 +74,7 @@ func newUserImagesToProcessTableImpl(schemaName, tableName, alias string) userIm
//Columns
ID: IDColumn,
Status: StatusColumn,
ImageID: ImageIDColumn,
UserID: UserIDColumn,

View File

@ -3,6 +3,7 @@ package main
import (
"context"
"database/sql"
"fmt"
"log"
"os"
"screenmark/screenmark/agents"
@ -62,8 +63,8 @@ func ListenNewImageEvents(db *sql.DB) {
return
}
_, err = imageModel.FinishProcessing(ctx, image.ID)
if err != nil {
if err := imageModel.StartProcessing(ctx, image.ID); err != nil {
log.Println("Failed to FinishProcessing")
log.Println(err)
return
@ -80,7 +81,29 @@ func ListenNewImageEvents(db *sql.DB) {
if err != nil {
log.Println(err)
}
imageModel.FinishProcessing(ctx, image.ID)
}()
}
}
}
func ListenProcessingImageStatus(db *sql.DB) {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil {
panic(err)
}
})
defer listener.Close()
if err := listener.Listen("new_processing_image_status"); err != nil {
panic(err)
}
for {
select {
case data := <- listener.Notify
fmt.Println(data)
}
}
}

View File

@ -49,6 +49,7 @@ func main() {
auth := CreateAuth(mail)
go ListenNewImageEvents(db)
go ListenProcessingImageStatus(db)
r := chi.NewRouter()
@ -244,6 +245,11 @@ func main() {
w.Header().Add("Content-Type", "application/json")
})
r.Get("/image-events/{id}", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
})
})
r.Post("/login", func(w http.ResponseWriter, r *http.Request) {

View File

@ -130,6 +130,14 @@ func (m ImageModel) FinishProcessing(ctx context.Context, imageId uuid.UUID) (mo
return userImage, err
}
func (m ImageModel) StartProcessing(ctx context.Context, processingImageId uuid.UUID) error {
startProcessingStmt := UserImagesToProcess.UPDATE(UserImagesToProcess.Status).SET(model.Progress_InProgress)
_, err := startProcessingStmt.ExecContext(ctx, m.dbPool)
return err
}
func (m ImageModel) Get(ctx context.Context, imageId uuid.UUID) (ImageData, error) {
getImageStmt := SELECT(UserImages.AllColumns, Image.AllColumns).
FROM(

View File

@ -2,6 +2,10 @@ DROP SCHEMA IF EXISTS haystack CASCADE;
CREATE SCHEMA haystack;
/* -----| Enums |----- */
CREATE TYPE haystack.progress AS ENUM('not-started','in-progress');
/* -----| Schema tables |----- */
CREATE TABLE haystack.users (
@ -17,6 +21,7 @@ CREATE TABLE haystack.image (
CREATE TABLE haystack.user_images_to_process (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
status haystack.progress NOT NULL DEFAULT 'not-started',
image_id uuid NOT NULL UNIQUE REFERENCES haystack.image (id),
user_id uuid NOT NULL REFERENCES haystack.users (id)
);
@ -155,6 +160,14 @@ BEGIN
END
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION notify_new_processing_image_status()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('new_processing_image_status', NEW.status::text);
RETURN NEW;
END
$$ LANGUAGE plpgsql;
/* -----| Triggers |----- */
CREATE OR REPLACE TRIGGER on_new_image AFTER INSERT
@ -162,6 +175,12 @@ ON haystack.user_images_to_process
FOR EACH ROW
EXECUTE PROCEDURE notify_new_image();
CREATE OR REPLACE TRIGGER on_update_image_progress
AFTER UPDATE OF status
ON haystack.user_images_to_process
FOR EACH ROW
EXECUTE PROCEDURE notify_new_processing_image_status();
/* -----| Test Data |----- */
-- Insert a user