Revert "FIXUP wip: notifications on starting progress"
This reverts commit 91b9e5402e9f153348f1326ee269533e1e47f777.
This commit is contained in:
@ -1,18 +0,0 @@
|
|||||||
//
|
|
||||||
// Code generated by go-jet DO NOT EDIT.
|
|
||||||
//
|
|
||||||
// WARNING: Changes to this file may cause incorrect behavior
|
|
||||||
// and will be lost if the code is regenerated
|
|
||||||
//
|
|
||||||
|
|
||||||
package enum
|
|
||||||
|
|
||||||
import "github.com/go-jet/jet/v2/postgres"
|
|
||||||
|
|
||||||
var Progress = &struct {
|
|
||||||
NotStarted postgres.StringExpression
|
|
||||||
InProgress postgres.StringExpression
|
|
||||||
}{
|
|
||||||
NotStarted: postgres.NewEnumValue("not-started"),
|
|
||||||
InProgress: postgres.NewEnumValue("in-progress"),
|
|
||||||
}
|
|
@ -1,49 +0,0 @@
|
|||||||
//
|
|
||||||
// Code generated by go-jet DO NOT EDIT.
|
|
||||||
//
|
|
||||||
// WARNING: Changes to this file may cause incorrect behavior
|
|
||||||
// and will be lost if the code is regenerated
|
|
||||||
//
|
|
||||||
|
|
||||||
package model
|
|
||||||
|
|
||||||
import "errors"
|
|
||||||
|
|
||||||
type Progress string
|
|
||||||
|
|
||||||
const (
|
|
||||||
Progress_NotStarted Progress = "not-started"
|
|
||||||
Progress_InProgress Progress = "in-progress"
|
|
||||||
)
|
|
||||||
|
|
||||||
var ProgressAllValues = []Progress{
|
|
||||||
Progress_NotStarted,
|
|
||||||
Progress_InProgress,
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *Progress) Scan(value interface{}) error {
|
|
||||||
var enumValue string
|
|
||||||
switch val := value.(type) {
|
|
||||||
case string:
|
|
||||||
enumValue = val
|
|
||||||
case []byte:
|
|
||||||
enumValue = string(val)
|
|
||||||
default:
|
|
||||||
return errors.New("jet: Invalid scan value for AllTypesEnum enum. Enum value has to be of type string or []byte")
|
|
||||||
}
|
|
||||||
|
|
||||||
switch enumValue {
|
|
||||||
case "not-started":
|
|
||||||
*e = Progress_NotStarted
|
|
||||||
case "in-progress":
|
|
||||||
*e = Progress_InProgress
|
|
||||||
default:
|
|
||||||
return errors.New("jet: Invalid scan value '" + enumValue + "' for Progress enum")
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e Progress) String() string {
|
|
||||||
return string(e)
|
|
||||||
}
|
|
@ -13,7 +13,6 @@ import (
|
|||||||
|
|
||||||
type UserImagesToProcess struct {
|
type UserImagesToProcess struct {
|
||||||
ID uuid.UUID `sql:"primary_key"`
|
ID uuid.UUID `sql:"primary_key"`
|
||||||
Status Progress
|
|
||||||
ImageID uuid.UUID
|
ImageID uuid.UUID
|
||||||
UserID uuid.UUID
|
UserID uuid.UUID
|
||||||
}
|
}
|
||||||
|
@ -18,7 +18,6 @@ type userImagesToProcessTable struct {
|
|||||||
|
|
||||||
// Columns
|
// Columns
|
||||||
ID postgres.ColumnString
|
ID postgres.ColumnString
|
||||||
Status postgres.ColumnString
|
|
||||||
ImageID postgres.ColumnString
|
ImageID postgres.ColumnString
|
||||||
UserID postgres.ColumnString
|
UserID postgres.ColumnString
|
||||||
|
|
||||||
@ -62,11 +61,10 @@ func newUserImagesToProcessTable(schemaName, tableName, alias string) *UserImage
|
|||||||
func newUserImagesToProcessTableImpl(schemaName, tableName, alias string) userImagesToProcessTable {
|
func newUserImagesToProcessTableImpl(schemaName, tableName, alias string) userImagesToProcessTable {
|
||||||
var (
|
var (
|
||||||
IDColumn = postgres.StringColumn("id")
|
IDColumn = postgres.StringColumn("id")
|
||||||
StatusColumn = postgres.StringColumn("status")
|
|
||||||
ImageIDColumn = postgres.StringColumn("image_id")
|
ImageIDColumn = postgres.StringColumn("image_id")
|
||||||
UserIDColumn = postgres.StringColumn("user_id")
|
UserIDColumn = postgres.StringColumn("user_id")
|
||||||
allColumns = postgres.ColumnList{IDColumn, StatusColumn, ImageIDColumn, UserIDColumn}
|
allColumns = postgres.ColumnList{IDColumn, ImageIDColumn, UserIDColumn}
|
||||||
mutableColumns = postgres.ColumnList{StatusColumn, ImageIDColumn, UserIDColumn}
|
mutableColumns = postgres.ColumnList{ImageIDColumn, UserIDColumn}
|
||||||
)
|
)
|
||||||
|
|
||||||
return userImagesToProcessTable{
|
return userImagesToProcessTable{
|
||||||
@ -74,7 +72,6 @@ func newUserImagesToProcessTableImpl(schemaName, tableName, alias string) userIm
|
|||||||
|
|
||||||
//Columns
|
//Columns
|
||||||
ID: IDColumn,
|
ID: IDColumn,
|
||||||
Status: StatusColumn,
|
|
||||||
ImageID: ImageIDColumn,
|
ImageID: ImageIDColumn,
|
||||||
UserID: UserIDColumn,
|
UserID: UserIDColumn,
|
||||||
|
|
||||||
|
@ -3,7 +3,6 @@ package main
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"screenmark/screenmark/agents"
|
"screenmark/screenmark/agents"
|
||||||
@ -63,8 +62,8 @@ func ListenNewImageEvents(db *sql.DB) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_, err = imageModel.FinishProcessing(ctx, image.ID)
|
||||||
if err := imageModel.StartProcessing(ctx, image.ID); err != nil {
|
if err != nil {
|
||||||
log.Println("Failed to FinishProcessing")
|
log.Println("Failed to FinishProcessing")
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
return
|
return
|
||||||
@ -81,29 +80,7 @@ func ListenNewImageEvents(db *sql.DB) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
imageModel.FinishProcessing(ctx, image.ID)
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ListenProcessingImageStatus(db *sql.DB) {
|
|
||||||
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
defer listener.Close()
|
|
||||||
|
|
||||||
if err := listener.Listen("new_processing_image_status"); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case data := <- listener.Notify
|
|
||||||
fmt.Println(data)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -49,7 +49,6 @@ func main() {
|
|||||||
auth := CreateAuth(mail)
|
auth := CreateAuth(mail)
|
||||||
|
|
||||||
go ListenNewImageEvents(db)
|
go ListenNewImageEvents(db)
|
||||||
go ListenProcessingImageStatus(db)
|
|
||||||
|
|
||||||
r := chi.NewRouter()
|
r := chi.NewRouter()
|
||||||
|
|
||||||
@ -245,11 +244,6 @@ func main() {
|
|||||||
w.Header().Add("Content-Type", "application/json")
|
w.Header().Add("Content-Type", "application/json")
|
||||||
})
|
})
|
||||||
|
|
||||||
r.Get("/image-events/{id}", func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.Header().Set("Content-Type", "text/event-stream")
|
|
||||||
w.Header().Set("Cache-Control", "no-cache")
|
|
||||||
w.Header().Set("Connection", "keep-alive")
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
|
|
||||||
r.Post("/login", func(w http.ResponseWriter, r *http.Request) {
|
r.Post("/login", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -130,14 +130,6 @@ func (m ImageModel) FinishProcessing(ctx context.Context, imageId uuid.UUID) (mo
|
|||||||
return userImage, err
|
return userImage, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m ImageModel) StartProcessing(ctx context.Context, processingImageId uuid.UUID) error {
|
|
||||||
startProcessingStmt := UserImagesToProcess.UPDATE(UserImagesToProcess.Status).SET(model.Progress_InProgress)
|
|
||||||
|
|
||||||
_, err := startProcessingStmt.ExecContext(ctx, m.dbPool)
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m ImageModel) Get(ctx context.Context, imageId uuid.UUID) (ImageData, error) {
|
func (m ImageModel) Get(ctx context.Context, imageId uuid.UUID) (ImageData, error) {
|
||||||
getImageStmt := SELECT(UserImages.AllColumns, Image.AllColumns).
|
getImageStmt := SELECT(UserImages.AllColumns, Image.AllColumns).
|
||||||
FROM(
|
FROM(
|
||||||
|
@ -2,10 +2,6 @@ DROP SCHEMA IF EXISTS haystack CASCADE;
|
|||||||
|
|
||||||
CREATE SCHEMA haystack;
|
CREATE SCHEMA haystack;
|
||||||
|
|
||||||
/* -----| Enums |----- */
|
|
||||||
|
|
||||||
CREATE TYPE haystack.progress AS ENUM('not-started','in-progress');
|
|
||||||
|
|
||||||
/* -----| Schema tables |----- */
|
/* -----| Schema tables |----- */
|
||||||
|
|
||||||
CREATE TABLE haystack.users (
|
CREATE TABLE haystack.users (
|
||||||
@ -21,7 +17,6 @@ CREATE TABLE haystack.image (
|
|||||||
|
|
||||||
CREATE TABLE haystack.user_images_to_process (
|
CREATE TABLE haystack.user_images_to_process (
|
||||||
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
|
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||||
status haystack.progress NOT NULL DEFAULT 'not-started',
|
|
||||||
image_id uuid NOT NULL UNIQUE REFERENCES haystack.image (id),
|
image_id uuid NOT NULL UNIQUE REFERENCES haystack.image (id),
|
||||||
user_id uuid NOT NULL REFERENCES haystack.users (id)
|
user_id uuid NOT NULL REFERENCES haystack.users (id)
|
||||||
);
|
);
|
||||||
@ -160,14 +155,6 @@ BEGIN
|
|||||||
END
|
END
|
||||||
$$ LANGUAGE plpgsql;
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION notify_new_processing_image_status()
|
|
||||||
RETURNS TRIGGER AS $$
|
|
||||||
BEGIN
|
|
||||||
PERFORM pg_notify('new_processing_image_status', NEW.status::text);
|
|
||||||
RETURN NEW;
|
|
||||||
END
|
|
||||||
$$ LANGUAGE plpgsql;
|
|
||||||
|
|
||||||
/* -----| Triggers |----- */
|
/* -----| Triggers |----- */
|
||||||
|
|
||||||
CREATE OR REPLACE TRIGGER on_new_image AFTER INSERT
|
CREATE OR REPLACE TRIGGER on_new_image AFTER INSERT
|
||||||
@ -175,12 +162,6 @@ ON haystack.user_images_to_process
|
|||||||
FOR EACH ROW
|
FOR EACH ROW
|
||||||
EXECUTE PROCEDURE notify_new_image();
|
EXECUTE PROCEDURE notify_new_image();
|
||||||
|
|
||||||
CREATE OR REPLACE TRIGGER on_update_image_progress
|
|
||||||
AFTER UPDATE OF status
|
|
||||||
ON haystack.user_images_to_process
|
|
||||||
FOR EACH ROW
|
|
||||||
EXECUTE PROCEDURE notify_new_processing_image_status();
|
|
||||||
|
|
||||||
/* -----| Test Data |----- */
|
/* -----| Test Data |----- */
|
||||||
|
|
||||||
-- Insert a user
|
-- Insert a user
|
||||||
|
Reference in New Issue
Block a user