refactor: creating image process to handle processing of images
Decoupling this from the DB, it's a good step. Not yet perfect however.
This commit is contained in:
94
backend/notifier/notifications.go
Normal file
94
backend/notifier/notifications.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package notifier
|
||||
|
||||
import (
|
||||
"errors"
|
||||
)
|
||||
|
||||
type Notifier[TNotification any] struct {
|
||||
bufferSize int
|
||||
|
||||
Listeners map[string]chan TNotification
|
||||
}
|
||||
|
||||
func (n *Notifier[TNotification]) Create(id string) error {
|
||||
if _, exists := n.Listeners[id]; exists {
|
||||
return errors.New("This listener already exists")
|
||||
}
|
||||
|
||||
n.Listeners[id] = make(chan TNotification, n.bufferSize)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var ChannelFullErr = errors.New("Channel is full")
|
||||
|
||||
// Ensures the listener exists before sending
|
||||
func (n *Notifier[TNotification]) SendAndCreate(id string, notification TNotification) error {
|
||||
if _, exists := n.Listeners[id]; !exists {
|
||||
if err := n.Create(id); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
ch := n.Listeners[id]
|
||||
|
||||
select {
|
||||
case ch <- notification:
|
||||
return nil
|
||||
default:
|
||||
return ChannelFullErr
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Notifier[TNotification]) Delete(id string) error {
|
||||
if _, exists := n.Listeners[id]; !exists {
|
||||
return errors.New("This listener does not exists")
|
||||
}
|
||||
|
||||
delete(n.Listeners, id)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewNotifier[TNotification any](bufferSize int) Notifier[TNotification] {
|
||||
return Notifier[TNotification]{
|
||||
bufferSize: bufferSize,
|
||||
Listeners: make(map[string]chan TNotification),
|
||||
}
|
||||
}
|
||||
|
||||
// ----------------------------------
|
||||
|
||||
type ChannelSplitter[TNotification any] struct {
|
||||
ch chan TNotification
|
||||
|
||||
Listeners map[string]chan TNotification
|
||||
}
|
||||
|
||||
func (s *ChannelSplitter[TNotification]) Listen() {
|
||||
go func() {
|
||||
for msg := range s.ch {
|
||||
for _, v := range s.Listeners {
|
||||
v <- msg
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *ChannelSplitter[TNotification]) Add(id string) chan TNotification {
|
||||
ch := make(chan TNotification)
|
||||
s.Listeners[id] = ch
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
func (s *ChannelSplitter[TNotification]) Remove(id string) {
|
||||
delete(s.Listeners, id)
|
||||
}
|
||||
|
||||
func NewChannelSplitter[TNotification any](ch chan TNotification) ChannelSplitter[TNotification] {
|
||||
return ChannelSplitter[TNotification]{
|
||||
ch: ch,
|
||||
Listeners: make(map[string]chan TNotification),
|
||||
}
|
||||
}
|
||||
48
backend/notifier/notifications_test.go
Normal file
48
backend/notifier/notifications_test.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package notifier
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSendingNotifications(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
require := require.New(t)
|
||||
|
||||
notifier := NewNotifier[string](3)
|
||||
|
||||
err := notifier.SendAndCreate("1", "a")
|
||||
require.NoError(err)
|
||||
|
||||
err = notifier.SendAndCreate("1", "b")
|
||||
require.NoError(err)
|
||||
|
||||
err = notifier.SendAndCreate("1", "c")
|
||||
require.NoError(err)
|
||||
|
||||
ch := notifier.Listeners["1"]
|
||||
|
||||
a := <-ch
|
||||
b := <-ch
|
||||
c := <-ch
|
||||
|
||||
assert.Equal(a, "a")
|
||||
assert.Equal(b, "b")
|
||||
assert.Equal(c, "c")
|
||||
}
|
||||
|
||||
func TestFullBuffer(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
require := require.New(t)
|
||||
|
||||
notifier := NewNotifier[string](1)
|
||||
|
||||
err := notifier.SendAndCreate("1", "a")
|
||||
require.NoError(err)
|
||||
|
||||
err = notifier.SendAndCreate("1", "b")
|
||||
|
||||
assert.Error(err)
|
||||
}
|
||||
Reference in New Issue
Block a user