2 Commits

Author SHA1 Message Date
3a182fc49b working notifications across backend and frontend 2025-08-25 15:22:24 +01:00
ec7bd469f9 sending notifications about new stacks 2025-08-25 15:13:29 +01:00
7 changed files with 229 additions and 23 deletions

View File

@ -19,13 +19,63 @@ import (
"github.com/lib/pq" "github.com/lib/pq"
) )
type Notification struct { const (
IMAGE_TYPE = "image"
LIST_TYPE = "list"
)
type imageNotification struct {
Type string
ImageID uuid.UUID ImageID uuid.UUID
ImageName string ImageName string
Status string Status string
} }
func ListenNewImageEvents(db *sql.DB, notifier *Notifier[Notification]) { type listNotification struct {
Type string
ListID uuid.UUID
Name string
Status string
}
type Notification struct {
image *imageNotification
list *listNotification
}
func getImageNotification(image imageNotification) Notification {
return Notification{
image: &image,
}
}
func getListNotification(list listNotification) Notification {
return Notification{
list: &list,
}
}
func (n Notification) MarshalJSON() ([]byte, error) {
if n.image != nil {
return json.Marshal(n.image)
}
if n.list != nil {
return json.Marshal(n.list)
}
return nil, fmt.Errorf("no image or list present")
}
func (n *Notification) UnmarshalJSON(data []byte) error {
return fmt.Errorf("unimplemented")
}
func ListenNewImageEvents(db *sql.DB) {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) { listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil { if err != nil {
panic(err) panic(err)
@ -128,11 +178,12 @@ func ListenProcessingImageStatus(db *sql.DB, images models.ImageModel, notifier
logger.Info("Update", "id", imageStringUuid, "status", status) logger.Info("Update", "id", imageStringUuid, "status", status)
notification := Notification{ notification := getImageNotification(imageNotification{
Type: IMAGE_TYPE,
ImageID: processingImage.ImageID, ImageID: processingImage.ImageID,
ImageName: processingImage.Image.ImageName, ImageName: processingImage.Image.ImageName,
Status: status, Status: status,
} })
if err := notifier.SendAndCreate(processingImage.UserID.String(), notification); err != nil { if err := notifier.SendAndCreate(processingImage.UserID.String(), notification); err != nil {
logger.Error(err) logger.Error(err)
@ -186,11 +237,61 @@ func ListenNewStackEvents(db *sql.DB) {
return return
} }
if err := stackModel.EndProcessing(ctx, stackID); err != nil {
newStacksLogger.Error("failed to finish processing", "error", err)
return
}
newStacksLogger.Debug("Finished processing stack", "StackID", stackID) newStacksLogger.Debug("Finished processing stack", "StackID", stackID)
}() }()
} }
} }
func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *Notifier[Notification]) {
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
if err != nil {
panic(err)
}
})
defer listener.Close()
logger := createLogger("Stack Status 📊", os.Stdout)
if err := listener.Listen("new_processing_stack_status"); err != nil {
panic(err)
}
for data := range listener.Notify {
stackStringUUID := data.Extra[0:36]
status := data.Extra[36:]
stackUUID, err := uuid.Parse(stackStringUUID)
if err != nil {
logger.Error(err)
continue
}
processingStack, err := stacks.GetToProcess(context.Background(), stackUUID)
if err != nil {
logger.Error("GetToProcess failed", "err", err)
continue
}
logger.Info("Update", "id", stackStringUUID, "status", status)
notification := getListNotification(listNotification{
Type: LIST_TYPE,
Name: processingStack.Title,
ListID: stackUUID,
Status: status,
})
if err := notifier.SendAndCreate(processingStack.UserID.String(), notification); err != nil {
logger.Error(err)
}
}
}
/* /*
* TODO: We have channels open every a user sends an image. * TODO: We have channels open every a user sends an image.
* We never close these channels. * We never close these channels.
@ -251,7 +352,8 @@ func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc {
return return
} }
fmt.Printf("Sending msg %s\n", msg) fmt.Printf("Sending msg %s\n", msgString)
fmt.Fprintf(w, "event: data\ndata: %s\n\n", string(msgString)) fmt.Fprintf(w, "event: data\ndata: %s\n\n", string(msgString))
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
} }

View File

@ -28,6 +28,7 @@ func (client TestAiClient) GetImageInfo(imageName string, imageData []byte) (cli
func setupRouter(db *sql.DB) chi.Router { func setupRouter(db *sql.DB) chi.Router {
imageModel := models.NewImageModel(db) imageModel := models.NewImageModel(db)
stackModel := models.NewListModel(db)
stackHandler := stacks.CreateStackHandler(db) stackHandler := stacks.CreateStackHandler(db)
authHandler := auth.CreateAuthHandler(db) authHandler := auth.CreateAuthHandler(db)
@ -37,9 +38,15 @@ func setupRouter(db *sql.DB) chi.Router {
// Only start event listeners if not in test environment // Only start event listeners if not in test environment
if os.Getenv("GO_TEST_ENVIRONMENT") != "true" { if os.Getenv("GO_TEST_ENVIRONMENT") != "true" {
go ListenNewImageEvents(db, &notifier)
// TODO: should extract these into a notification manager
// And actually make them the same code.
// The events are basically the same.
go ListenNewImageEvents(db)
go ListenProcessingImageStatus(db, imageModel, &notifier) go ListenProcessingImageStatus(db, imageModel, &notifier)
go ListenNewStackEvents(db) go ListenNewStackEvents(db)
go ListenProcessingStackStatus(db, stackModel, &notifier)
} }
r := chi.NewRouter() r := chi.NewRouter()

View File

@ -92,6 +92,21 @@ func (m ListModel) GetProcessing(ctx context.Context, processingListID uuid.UUID
return list, err return list, err
} }
func (m ListModel) GetToProcess(ctx context.Context, listID uuid.UUID) (model.ProcessingLists, error) {
getToProcessStmt := ProcessingLists.
SELECT(ProcessingLists.AllColumns).
WHERE(ProcessingLists.ID.EQ(UUID(listID)))
stack := []model.ProcessingLists{}
err := getToProcessStmt.QueryContext(ctx, m.dbPool, &stack)
if len(stack) != 1 {
return model.ProcessingLists{}, fmt.Errorf("Expected 1, got %d\n", len(stack))
}
return stack[0], err
}
// ======================================== // ========================================
// UPDATE // UPDATE
// ======================================== // ========================================
@ -106,6 +121,16 @@ func (m ListModel) StartProcessing(ctx context.Context, processingListID uuid.UU
return err return err
} }
func (m ListModel) EndProcessing(ctx context.Context, processingListID uuid.UUID) error {
startProcessingStmt := ProcessingLists.
UPDATE(ProcessingLists.Status).
SET(model.Progress_Complete).
WHERE(ProcessingLists.ID.EQ(UUID(processingListID)))
_, err := startProcessingStmt.ExecContext(ctx, m.dbPool)
return err
}
// ======================================== // ========================================
// INSERT methods // INSERT methods
// ======================================== // ========================================

View File

@ -124,6 +124,14 @@ PERFORM pg_notify('new_stack', NEW.id::text);
END END
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION notify_new_processing_stack_status()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('new_processing_stack_status', NEW.id::text || NEW.status::text);
RETURN NEW;
END
$$ LANGUAGE plpgsql;
/* -----| Triggers |----- */ /* -----| Triggers |----- */
CREATE OR REPLACE TRIGGER on_new_image AFTER INSERT CREATE OR REPLACE TRIGGER on_new_image AFTER INSERT
@ -142,4 +150,10 @@ ON haystack.processing_lists
FOR EACH ROW FOR EACH ROW
EXECUTE PROCEDURE notify_new_stacks(); EXECUTE PROCEDURE notify_new_stacks();
CREATE OR REPLACE TRIGGER on_update_stack_progress
AFTER UPDATE OF status
ON haystack.processing_lists
FOR EACH ROW
EXECUTE PROCEDURE notify_new_processing_stack_status();
/* -----| Test Data |----- */ /* -----| Test Data |----- */

View File

@ -8,7 +8,8 @@ export const ProcessingImages: Component = () => {
const notifications = useNotifications(); const notifications = useNotifications();
const processingNumber = () => const processingNumber = () =>
Object.keys(notifications.state.ProcessingImages).length; Object.keys(notifications.state.ProcessingImages).length +
Object.keys(notifications.state.ProcessingLists).length;
return ( return (
<Popover sameWidth gutter={4}> <Popover sameWidth gutter={4}>
@ -16,7 +17,7 @@ export const ProcessingImages: Component = () => {
<Show when={processingNumber() > 0}> <Show when={processingNumber() > 0}>
<p class="text-md"> <p class="text-md">
Processing {processingNumber()}{" "} Processing {processingNumber()}{" "}
{processingNumber() === 1 ? "image" : "images"} {processingNumber() === 1 ? "item" : "items"}
... ...
</p> </p>
</Show> </Show>
@ -30,10 +31,8 @@ export const ProcessingImages: Component = () => {
<Popover.Portal> <Popover.Portal>
<Popover.Content class="shadow-2xl flex flex-col gap-2 bg-white rounded-xl p-2"> <Popover.Content class="shadow-2xl flex flex-col gap-2 bg-white rounded-xl p-2">
<Show <Show
when={ when={processingNumber() > 0}
Object.entries(notifications.state.ProcessingImages).length > 0 fallback={<p>No items to process</p>}
}
fallback={<p>No images to process</p>}
> >
<For each={Object.entries(notifications.state.ProcessingImages)}> <For each={Object.entries(notifications.state.ProcessingImages)}>
{([id, _image]) => ( {([id, _image]) => (
@ -57,6 +56,24 @@ export const ProcessingImages: Component = () => {
</Show> </Show>
)} )}
</For> </For>
<For each={Object.entries(notifications.state.ProcessingLists)}>
{([, _list]) => (
<Show when={_list}>
{(list) => (
<div class="flex gap-2 w-full justify-center">
<div class="flex flex-col gap-1">
<p class="text-slate-900">New Stack: {list().Name}</p>
</div>
<LoadingCircle
status="loading"
class="ml-auto self-center"
/>
</div>
)}
</Show>
)}
</For>
</Show> </Show>
</Popover.Content> </Popover.Content>
</Popover.Portal> </Popover.Portal>

View File

@ -10,18 +10,27 @@ import {
useContext, useContext,
} from "solid-js"; } from "solid-js";
import { base } from "@network/index"; import { base } from "@network/index";
import { processingImagesValidator } from "@network/notifications"; import {
notificationValidator,
processingImagesValidator,
processingListValidator,
} from "@network/notifications";
type NotificationState = { type NotificationState = {
ProcessingImages: Record< ProcessingImages: Record<
string, string,
InferOutput<typeof processingImagesValidator> | undefined InferOutput<typeof processingImagesValidator> | undefined
>; >;
ProcessingLists: Record<
string,
InferOutput<typeof processingListValidator> | undefined
>;
}; };
export const Notifications = (onCompleteImage: () => void) => { export const Notifications = (onCompleteImage: () => void) => {
const [state, setState] = createStore<NotificationState>({ const [state, setState] = createStore<NotificationState>({
ProcessingImages: {}, ProcessingImages: {},
ProcessingLists: {},
}); });
const { processingImages } = useSearchImageContext(); const { processingImages } = useSearchImageContext();
@ -45,21 +54,32 @@ export const Notifications = (onCompleteImage: () => void) => {
return; return;
} }
const processingImage = safeParse(processingImagesValidator, jsonData); const notification = safeParse(notificationValidator, jsonData);
if (!processingImage.success) { if (!notification.success) {
console.error("Processing image could not be parsed.", e.data); console.error("Processing image could not be parsed.", e.data);
return; return;
} }
console.log("SSE: ", processingImage); console.log("SSE: ", notification);
const { ImageID, Status } = processingImage.output; if (notification.output.Type === "image") {
const { ImageID, Status } = notification.output;
if (Status === "complete") { if (Status === "complete") {
setState("ProcessingImages", ImageID, undefined); setState("ProcessingImages", ImageID, undefined);
onCompleteImage(); onCompleteImage();
} else { } else {
setState("ProcessingImages", ImageID, processingImage.output); setState("ProcessingImages", ImageID, notification.output);
}
} else if (notification.output.Type === "list") {
const { ListID, Status } = notification.output;
if (Status === "complete") {
setState("ProcessingLists", ListID, undefined);
onCompleteImage();
} else {
setState("ProcessingLists", ListID, notification.output);
}
} }
}; };
@ -83,6 +103,7 @@ export const Notifications = (onCompleteImage: () => void) => {
images.map((i) => [ images.map((i) => [
i.ImageID, i.ImageID,
{ {
Type: "image",
ImageID: i.ImageID, ImageID: i.ImageID,
ImageName: i.Image.ImageName, ImageName: i.Image.ImageName,
Status: i.Status, Status: i.Status,

View File

@ -1,6 +1,21 @@
import { literal, pipe, strictObject, string, union, uuid } from "valibot"; import { literal, pipe, strictObject, string, union, uuid } from "valibot";
export const processingListValidator = strictObject({
Type: literal("list"),
Name: string(),
ListID: pipe(string(), uuid()),
Status: union([
literal("not-started"),
literal("in-progress"),
literal("complete"),
]),
});
export const processingImagesValidator = strictObject({ export const processingImagesValidator = strictObject({
Type: literal("image"),
ImageID: pipe(string(), uuid()), ImageID: pipe(string(), uuid()),
ImageName: string(), ImageName: string(),
Status: union([ Status: union([
@ -9,3 +24,8 @@ export const processingImagesValidator = strictObject({
literal("complete"), literal("complete"),
]), ]),
}); });
export const notificationValidator = union([
processingListValidator,
processingImagesValidator,
]);