Compare commits
2 Commits
6523b10699
...
3a182fc49b
Author | SHA1 | Date | |
---|---|---|---|
3a182fc49b | |||
ec7bd469f9 |
@ -19,13 +19,63 @@ import (
|
|||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Notification struct {
|
const (
|
||||||
|
IMAGE_TYPE = "image"
|
||||||
|
LIST_TYPE = "list"
|
||||||
|
)
|
||||||
|
|
||||||
|
type imageNotification struct {
|
||||||
|
Type string
|
||||||
|
|
||||||
ImageID uuid.UUID
|
ImageID uuid.UUID
|
||||||
ImageName string
|
ImageName string
|
||||||
|
|
||||||
Status string
|
Status string
|
||||||
}
|
}
|
||||||
|
|
||||||
func ListenNewImageEvents(db *sql.DB, notifier *Notifier[Notification]) {
|
type listNotification struct {
|
||||||
|
Type string
|
||||||
|
|
||||||
|
ListID uuid.UUID
|
||||||
|
Name string
|
||||||
|
|
||||||
|
Status string
|
||||||
|
}
|
||||||
|
|
||||||
|
type Notification struct {
|
||||||
|
image *imageNotification
|
||||||
|
list *listNotification
|
||||||
|
}
|
||||||
|
|
||||||
|
func getImageNotification(image imageNotification) Notification {
|
||||||
|
return Notification{
|
||||||
|
image: &image,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getListNotification(list listNotification) Notification {
|
||||||
|
return Notification{
|
||||||
|
list: &list,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n Notification) MarshalJSON() ([]byte, error) {
|
||||||
|
if n.image != nil {
|
||||||
|
return json.Marshal(n.image)
|
||||||
|
}
|
||||||
|
|
||||||
|
if n.list != nil {
|
||||||
|
return json.Marshal(n.list)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("no image or list present")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Notification) UnmarshalJSON(data []byte) error {
|
||||||
|
return fmt.Errorf("unimplemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func ListenNewImageEvents(db *sql.DB) {
|
||||||
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
|
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -128,11 +178,12 @@ func ListenProcessingImageStatus(db *sql.DB, images models.ImageModel, notifier
|
|||||||
|
|
||||||
logger.Info("Update", "id", imageStringUuid, "status", status)
|
logger.Info("Update", "id", imageStringUuid, "status", status)
|
||||||
|
|
||||||
notification := Notification{
|
notification := getImageNotification(imageNotification{
|
||||||
|
Type: IMAGE_TYPE,
|
||||||
ImageID: processingImage.ImageID,
|
ImageID: processingImage.ImageID,
|
||||||
ImageName: processingImage.Image.ImageName,
|
ImageName: processingImage.Image.ImageName,
|
||||||
Status: status,
|
Status: status,
|
||||||
}
|
})
|
||||||
|
|
||||||
if err := notifier.SendAndCreate(processingImage.UserID.String(), notification); err != nil {
|
if err := notifier.SendAndCreate(processingImage.UserID.String(), notification); err != nil {
|
||||||
logger.Error(err)
|
logger.Error(err)
|
||||||
@ -186,11 +237,61 @@ func ListenNewStackEvents(db *sql.DB) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := stackModel.EndProcessing(ctx, stackID); err != nil {
|
||||||
|
newStacksLogger.Error("failed to finish processing", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
newStacksLogger.Debug("Finished processing stack", "StackID", stackID)
|
newStacksLogger.Debug("Finished processing stack", "StackID", stackID)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ListenProcessingStackStatus(db *sql.DB, stacks models.ListModel, notifier *Notifier[Notification]) {
|
||||||
|
listener := pq.NewListener(os.Getenv("DB_CONNECTION"), time.Second, time.Second, func(event pq.ListenerEventType, err error) {
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
defer listener.Close()
|
||||||
|
|
||||||
|
logger := createLogger("Stack Status 📊", os.Stdout)
|
||||||
|
|
||||||
|
if err := listener.Listen("new_processing_stack_status"); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for data := range listener.Notify {
|
||||||
|
stackStringUUID := data.Extra[0:36]
|
||||||
|
status := data.Extra[36:]
|
||||||
|
|
||||||
|
stackUUID, err := uuid.Parse(stackStringUUID)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
processingStack, err := stacks.GetToProcess(context.Background(), stackUUID)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("GetToProcess failed", "err", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("Update", "id", stackStringUUID, "status", status)
|
||||||
|
|
||||||
|
notification := getListNotification(listNotification{
|
||||||
|
Type: LIST_TYPE,
|
||||||
|
Name: processingStack.Title,
|
||||||
|
ListID: stackUUID,
|
||||||
|
Status: status,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := notifier.SendAndCreate(processingStack.UserID.String(), notification); err != nil {
|
||||||
|
logger.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TODO: We have channels open every a user sends an image.
|
* TODO: We have channels open every a user sends an image.
|
||||||
* We never close these channels.
|
* We never close these channels.
|
||||||
@ -251,7 +352,8 @@ func CreateEventsHandler(notifier *Notifier[Notification]) http.HandlerFunc {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("Sending msg %s\n", msg)
|
fmt.Printf("Sending msg %s\n", msgString)
|
||||||
|
|
||||||
fmt.Fprintf(w, "event: data\ndata: %s\n\n", string(msgString))
|
fmt.Fprintf(w, "event: data\ndata: %s\n\n", string(msgString))
|
||||||
w.(http.Flusher).Flush()
|
w.(http.Flusher).Flush()
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@ func (client TestAiClient) GetImageInfo(imageName string, imageData []byte) (cli
|
|||||||
|
|
||||||
func setupRouter(db *sql.DB) chi.Router {
|
func setupRouter(db *sql.DB) chi.Router {
|
||||||
imageModel := models.NewImageModel(db)
|
imageModel := models.NewImageModel(db)
|
||||||
|
stackModel := models.NewListModel(db)
|
||||||
|
|
||||||
stackHandler := stacks.CreateStackHandler(db)
|
stackHandler := stacks.CreateStackHandler(db)
|
||||||
authHandler := auth.CreateAuthHandler(db)
|
authHandler := auth.CreateAuthHandler(db)
|
||||||
@ -37,9 +38,15 @@ func setupRouter(db *sql.DB) chi.Router {
|
|||||||
|
|
||||||
// Only start event listeners if not in test environment
|
// Only start event listeners if not in test environment
|
||||||
if os.Getenv("GO_TEST_ENVIRONMENT") != "true" {
|
if os.Getenv("GO_TEST_ENVIRONMENT") != "true" {
|
||||||
go ListenNewImageEvents(db, ¬ifier)
|
|
||||||
|
// TODO: should extract these into a notification manager
|
||||||
|
// And actually make them the same code.
|
||||||
|
// The events are basically the same.
|
||||||
|
|
||||||
|
go ListenNewImageEvents(db)
|
||||||
go ListenProcessingImageStatus(db, imageModel, ¬ifier)
|
go ListenProcessingImageStatus(db, imageModel, ¬ifier)
|
||||||
go ListenNewStackEvents(db)
|
go ListenNewStackEvents(db)
|
||||||
|
go ListenProcessingStackStatus(db, stackModel, ¬ifier)
|
||||||
}
|
}
|
||||||
|
|
||||||
r := chi.NewRouter()
|
r := chi.NewRouter()
|
||||||
|
@ -92,6 +92,21 @@ func (m ListModel) GetProcessing(ctx context.Context, processingListID uuid.UUID
|
|||||||
return list, err
|
return list, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m ListModel) GetToProcess(ctx context.Context, listID uuid.UUID) (model.ProcessingLists, error) {
|
||||||
|
getToProcessStmt := ProcessingLists.
|
||||||
|
SELECT(ProcessingLists.AllColumns).
|
||||||
|
WHERE(ProcessingLists.ID.EQ(UUID(listID)))
|
||||||
|
|
||||||
|
stack := []model.ProcessingLists{}
|
||||||
|
err := getToProcessStmt.QueryContext(ctx, m.dbPool, &stack)
|
||||||
|
|
||||||
|
if len(stack) != 1 {
|
||||||
|
return model.ProcessingLists{}, fmt.Errorf("Expected 1, got %d\n", len(stack))
|
||||||
|
}
|
||||||
|
|
||||||
|
return stack[0], err
|
||||||
|
}
|
||||||
|
|
||||||
// ========================================
|
// ========================================
|
||||||
// UPDATE
|
// UPDATE
|
||||||
// ========================================
|
// ========================================
|
||||||
@ -106,6 +121,16 @@ func (m ListModel) StartProcessing(ctx context.Context, processingListID uuid.UU
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m ListModel) EndProcessing(ctx context.Context, processingListID uuid.UUID) error {
|
||||||
|
startProcessingStmt := ProcessingLists.
|
||||||
|
UPDATE(ProcessingLists.Status).
|
||||||
|
SET(model.Progress_Complete).
|
||||||
|
WHERE(ProcessingLists.ID.EQ(UUID(processingListID)))
|
||||||
|
|
||||||
|
_, err := startProcessingStmt.ExecContext(ctx, m.dbPool)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// ========================================
|
// ========================================
|
||||||
// INSERT methods
|
// INSERT methods
|
||||||
// ========================================
|
// ========================================
|
||||||
|
@ -124,6 +124,14 @@ PERFORM pg_notify('new_stack', NEW.id::text);
|
|||||||
END
|
END
|
||||||
$$ LANGUAGE plpgsql;
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION notify_new_processing_stack_status()
|
||||||
|
RETURNS TRIGGER AS $$
|
||||||
|
BEGIN
|
||||||
|
PERFORM pg_notify('new_processing_stack_status', NEW.id::text || NEW.status::text);
|
||||||
|
RETURN NEW;
|
||||||
|
END
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
/* -----| Triggers |----- */
|
/* -----| Triggers |----- */
|
||||||
|
|
||||||
CREATE OR REPLACE TRIGGER on_new_image AFTER INSERT
|
CREATE OR REPLACE TRIGGER on_new_image AFTER INSERT
|
||||||
@ -142,4 +150,10 @@ ON haystack.processing_lists
|
|||||||
FOR EACH ROW
|
FOR EACH ROW
|
||||||
EXECUTE PROCEDURE notify_new_stacks();
|
EXECUTE PROCEDURE notify_new_stacks();
|
||||||
|
|
||||||
|
CREATE OR REPLACE TRIGGER on_update_stack_progress
|
||||||
|
AFTER UPDATE OF status
|
||||||
|
ON haystack.processing_lists
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE notify_new_processing_stack_status();
|
||||||
|
|
||||||
/* -----| Test Data |----- */
|
/* -----| Test Data |----- */
|
||||||
|
@ -8,7 +8,8 @@ export const ProcessingImages: Component = () => {
|
|||||||
const notifications = useNotifications();
|
const notifications = useNotifications();
|
||||||
|
|
||||||
const processingNumber = () =>
|
const processingNumber = () =>
|
||||||
Object.keys(notifications.state.ProcessingImages).length;
|
Object.keys(notifications.state.ProcessingImages).length +
|
||||||
|
Object.keys(notifications.state.ProcessingLists).length;
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<Popover sameWidth gutter={4}>
|
<Popover sameWidth gutter={4}>
|
||||||
@ -16,7 +17,7 @@ export const ProcessingImages: Component = () => {
|
|||||||
<Show when={processingNumber() > 0}>
|
<Show when={processingNumber() > 0}>
|
||||||
<p class="text-md">
|
<p class="text-md">
|
||||||
Processing {processingNumber()}{" "}
|
Processing {processingNumber()}{" "}
|
||||||
{processingNumber() === 1 ? "image" : "images"}
|
{processingNumber() === 1 ? "item" : "items"}
|
||||||
...
|
...
|
||||||
</p>
|
</p>
|
||||||
</Show>
|
</Show>
|
||||||
@ -30,10 +31,8 @@ export const ProcessingImages: Component = () => {
|
|||||||
<Popover.Portal>
|
<Popover.Portal>
|
||||||
<Popover.Content class="shadow-2xl flex flex-col gap-2 bg-white rounded-xl p-2">
|
<Popover.Content class="shadow-2xl flex flex-col gap-2 bg-white rounded-xl p-2">
|
||||||
<Show
|
<Show
|
||||||
when={
|
when={processingNumber() > 0}
|
||||||
Object.entries(notifications.state.ProcessingImages).length > 0
|
fallback={<p>No items to process</p>}
|
||||||
}
|
|
||||||
fallback={<p>No images to process</p>}
|
|
||||||
>
|
>
|
||||||
<For each={Object.entries(notifications.state.ProcessingImages)}>
|
<For each={Object.entries(notifications.state.ProcessingImages)}>
|
||||||
{([id, _image]) => (
|
{([id, _image]) => (
|
||||||
@ -57,6 +56,24 @@ export const ProcessingImages: Component = () => {
|
|||||||
</Show>
|
</Show>
|
||||||
)}
|
)}
|
||||||
</For>
|
</For>
|
||||||
|
|
||||||
|
<For each={Object.entries(notifications.state.ProcessingLists)}>
|
||||||
|
{([, _list]) => (
|
||||||
|
<Show when={_list}>
|
||||||
|
{(list) => (
|
||||||
|
<div class="flex gap-2 w-full justify-center">
|
||||||
|
<div class="flex flex-col gap-1">
|
||||||
|
<p class="text-slate-900">New Stack: {list().Name}</p>
|
||||||
|
</div>
|
||||||
|
<LoadingCircle
|
||||||
|
status="loading"
|
||||||
|
class="ml-auto self-center"
|
||||||
|
/>
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
</Show>
|
||||||
|
)}
|
||||||
|
</For>
|
||||||
</Show>
|
</Show>
|
||||||
</Popover.Content>
|
</Popover.Content>
|
||||||
</Popover.Portal>
|
</Popover.Portal>
|
||||||
|
@ -10,18 +10,27 @@ import {
|
|||||||
useContext,
|
useContext,
|
||||||
} from "solid-js";
|
} from "solid-js";
|
||||||
import { base } from "@network/index";
|
import { base } from "@network/index";
|
||||||
import { processingImagesValidator } from "@network/notifications";
|
import {
|
||||||
|
notificationValidator,
|
||||||
|
processingImagesValidator,
|
||||||
|
processingListValidator,
|
||||||
|
} from "@network/notifications";
|
||||||
|
|
||||||
type NotificationState = {
|
type NotificationState = {
|
||||||
ProcessingImages: Record<
|
ProcessingImages: Record<
|
||||||
string,
|
string,
|
||||||
InferOutput<typeof processingImagesValidator> | undefined
|
InferOutput<typeof processingImagesValidator> | undefined
|
||||||
>;
|
>;
|
||||||
|
ProcessingLists: Record<
|
||||||
|
string,
|
||||||
|
InferOutput<typeof processingListValidator> | undefined
|
||||||
|
>;
|
||||||
};
|
};
|
||||||
|
|
||||||
export const Notifications = (onCompleteImage: () => void) => {
|
export const Notifications = (onCompleteImage: () => void) => {
|
||||||
const [state, setState] = createStore<NotificationState>({
|
const [state, setState] = createStore<NotificationState>({
|
||||||
ProcessingImages: {},
|
ProcessingImages: {},
|
||||||
|
ProcessingLists: {},
|
||||||
});
|
});
|
||||||
|
|
||||||
const { processingImages } = useSearchImageContext();
|
const { processingImages } = useSearchImageContext();
|
||||||
@ -45,21 +54,32 @@ export const Notifications = (onCompleteImage: () => void) => {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const processingImage = safeParse(processingImagesValidator, jsonData);
|
const notification = safeParse(notificationValidator, jsonData);
|
||||||
if (!processingImage.success) {
|
if (!notification.success) {
|
||||||
console.error("Processing image could not be parsed.", e.data);
|
console.error("Processing image could not be parsed.", e.data);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
console.log("SSE: ", processingImage);
|
console.log("SSE: ", notification);
|
||||||
|
|
||||||
const { ImageID, Status } = processingImage.output;
|
if (notification.output.Type === "image") {
|
||||||
|
const { ImageID, Status } = notification.output;
|
||||||
|
|
||||||
if (Status === "complete") {
|
if (Status === "complete") {
|
||||||
setState("ProcessingImages", ImageID, undefined);
|
setState("ProcessingImages", ImageID, undefined);
|
||||||
onCompleteImage();
|
onCompleteImage();
|
||||||
} else {
|
} else {
|
||||||
setState("ProcessingImages", ImageID, processingImage.output);
|
setState("ProcessingImages", ImageID, notification.output);
|
||||||
|
}
|
||||||
|
} else if (notification.output.Type === "list") {
|
||||||
|
const { ListID, Status } = notification.output;
|
||||||
|
|
||||||
|
if (Status === "complete") {
|
||||||
|
setState("ProcessingLists", ListID, undefined);
|
||||||
|
onCompleteImage();
|
||||||
|
} else {
|
||||||
|
setState("ProcessingLists", ListID, notification.output);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -83,6 +103,7 @@ export const Notifications = (onCompleteImage: () => void) => {
|
|||||||
images.map((i) => [
|
images.map((i) => [
|
||||||
i.ImageID,
|
i.ImageID,
|
||||||
{
|
{
|
||||||
|
Type: "image",
|
||||||
ImageID: i.ImageID,
|
ImageID: i.ImageID,
|
||||||
ImageName: i.Image.ImageName,
|
ImageName: i.Image.ImageName,
|
||||||
Status: i.Status,
|
Status: i.Status,
|
||||||
|
@ -1,6 +1,21 @@
|
|||||||
import { literal, pipe, strictObject, string, union, uuid } from "valibot";
|
import { literal, pipe, strictObject, string, union, uuid } from "valibot";
|
||||||
|
|
||||||
|
export const processingListValidator = strictObject({
|
||||||
|
Type: literal("list"),
|
||||||
|
|
||||||
|
Name: string(),
|
||||||
|
ListID: pipe(string(), uuid()),
|
||||||
|
|
||||||
|
Status: union([
|
||||||
|
literal("not-started"),
|
||||||
|
literal("in-progress"),
|
||||||
|
literal("complete"),
|
||||||
|
]),
|
||||||
|
});
|
||||||
|
|
||||||
export const processingImagesValidator = strictObject({
|
export const processingImagesValidator = strictObject({
|
||||||
|
Type: literal("image"),
|
||||||
|
|
||||||
ImageID: pipe(string(), uuid()),
|
ImageID: pipe(string(), uuid()),
|
||||||
ImageName: string(),
|
ImageName: string(),
|
||||||
Status: union([
|
Status: union([
|
||||||
@ -9,3 +24,8 @@ export const processingImagesValidator = strictObject({
|
|||||||
literal("complete"),
|
literal("complete"),
|
||||||
]),
|
]),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
export const notificationValidator = union([
|
||||||
|
processingListValidator,
|
||||||
|
processingImagesValidator,
|
||||||
|
]);
|
||||||
|
Reference in New Issue
Block a user