If the browser supports EventSource switch to use this instead of polling notifications. Signed-off-by: Andrew Thornton art27@cantab.netfor-closed-social
@ -0,0 +1,78 @@ | |||||
// Copyright 2020 The Gitea Authors. All rights reserved. | |||||
// Use of this source code is governed by a MIT-style | |||||
// license that can be found in the LICENSE file. | |||||
package integrations | |||||
import ( | |||||
"fmt" | |||||
"net/http" | |||||
"testing" | |||||
"time" | |||||
"code.gitea.io/gitea/models" | |||||
"code.gitea.io/gitea/modules/eventsource" | |||||
api "code.gitea.io/gitea/modules/structs" | |||||
"github.com/stretchr/testify/assert" | |||||
) | |||||
func TestEventSourceManagerRun(t *testing.T) { | |||||
defer prepareTestEnv(t)() | |||||
manager := eventsource.GetManager() | |||||
eventChan := manager.Register(2) | |||||
defer func() { | |||||
manager.Unregister(2, eventChan) | |||||
// ensure the eventChan is closed | |||||
for { | |||||
_, ok := <-eventChan | |||||
if !ok { | |||||
break | |||||
} | |||||
} | |||||
}() | |||||
expectNotificationCountEvent := func(count int64) func() bool { | |||||
return func() bool { | |||||
select { | |||||
case event, ok := <-eventChan: | |||||
if !ok { | |||||
return false | |||||
} | |||||
data, ok := event.Data.(models.UserIDCount) | |||||
if !ok { | |||||
return false | |||||
} | |||||
return event.Name == "notification-count" && data.Count == count | |||||
default: | |||||
return false | |||||
} | |||||
} | |||||
} | |||||
user2 := models.AssertExistsAndLoadBean(t, &models.User{ID: 2}).(*models.User) | |||||
repo1 := models.AssertExistsAndLoadBean(t, &models.Repository{ID: 1}).(*models.Repository) | |||||
thread5 := models.AssertExistsAndLoadBean(t, &models.Notification{ID: 5}).(*models.Notification) | |||||
assert.NoError(t, thread5.LoadAttributes()) | |||||
session := loginUser(t, user2.Name) | |||||
token := getTokenForLoggedInUser(t, session) | |||||
var apiNL []api.NotificationThread | |||||
// -- mark notifications as read -- | |||||
req := NewRequest(t, "GET", fmt.Sprintf("/api/v1/notifications?token=%s", token)) | |||||
resp := session.MakeRequest(t, req, http.StatusOK) | |||||
DecodeJSON(t, resp, &apiNL) | |||||
assert.Len(t, apiNL, 2) | |||||
lastReadAt := "2000-01-01T00%3A50%3A01%2B00%3A00" //946687801 <- only Notification 4 is in this filter ... | |||||
req = NewRequest(t, "PUT", fmt.Sprintf("/api/v1/repos/%s/%s/notifications?last_read_at=%s&token=%s", user2.Name, repo1.Name, lastReadAt, token)) | |||||
resp = session.MakeRequest(t, req, http.StatusResetContent) | |||||
req = NewRequest(t, "GET", fmt.Sprintf("/api/v1/notifications?token=%s", token)) | |||||
resp = session.MakeRequest(t, req, http.StatusOK) | |||||
DecodeJSON(t, resp, &apiNL) | |||||
assert.Len(t, apiNL, 1) | |||||
assert.Eventually(t, expectNotificationCountEvent(1), 30*time.Second, 1*time.Second) | |||||
} |
@ -0,0 +1,119 @@ | |||||
// Copyright 2020 The Gitea Authors. All rights reserved. | |||||
// Use of this source code is governed by a MIT-style | |||||
// license that can be found in the LICENSE file. | |||||
package eventsource | |||||
import ( | |||||
"bytes" | |||||
"encoding/json" | |||||
"fmt" | |||||
"io" | |||||
"strings" | |||||
"time" | |||||
) | |||||
func wrapNewlines(w io.Writer, prefix []byte, value []byte) (sum int64, err error) { | |||||
if len(value) == 0 { | |||||
return | |||||
} | |||||
n := 0 | |||||
last := 0 | |||||
for j := bytes.IndexByte(value, '\n'); j > -1; j = bytes.IndexByte(value[last:], '\n') { | |||||
n, err = w.Write(prefix) | |||||
sum += int64(n) | |||||
if err != nil { | |||||
return | |||||
} | |||||
n, err = w.Write(value[last : last+j+1]) | |||||
sum += int64(n) | |||||
if err != nil { | |||||
return | |||||
} | |||||
last += j + 1 | |||||
} | |||||
n, err = w.Write(prefix) | |||||
sum += int64(n) | |||||
if err != nil { | |||||
return | |||||
} | |||||
n, err = w.Write(value[last:]) | |||||
sum += int64(n) | |||||
if err != nil { | |||||
return | |||||
} | |||||
n, err = w.Write([]byte("\n")) | |||||
sum += int64(n) | |||||
return | |||||
} | |||||
// Event is an eventsource event, not all fields need to be set | |||||
type Event struct { | |||||
// Name represents the value of the event: tag in the stream | |||||
Name string | |||||
// Data is either JSONified []byte or interface{} that can be JSONd | |||||
Data interface{} | |||||
// ID represents the ID of an event | |||||
ID string | |||||
// Retry tells the receiver only to attempt to reconnect to the source after this time | |||||
Retry time.Duration | |||||
} | |||||
// WriteTo writes data to w until there's no more data to write or when an error occurs. | |||||
// The return value n is the number of bytes written. Any error encountered during the write is also returned. | |||||
func (e *Event) WriteTo(w io.Writer) (int64, error) { | |||||
sum := int64(0) | |||||
nint := 0 | |||||
n, err := wrapNewlines(w, []byte("event: "), []byte(e.Name)) | |||||
sum += n | |||||
if err != nil { | |||||
return sum, err | |||||
} | |||||
if e.Data != nil { | |||||
var data []byte | |||||
switch v := e.Data.(type) { | |||||
case []byte: | |||||
data = v | |||||
case string: | |||||
data = []byte(v) | |||||
default: | |||||
var err error | |||||
data, err = json.Marshal(e.Data) | |||||
if err != nil { | |||||
return sum, err | |||||
} | |||||
} | |||||
n, err := wrapNewlines(w, []byte("data: "), data) | |||||
sum += n | |||||
if err != nil { | |||||
return sum, err | |||||
} | |||||
} | |||||
n, err = wrapNewlines(w, []byte("id: "), []byte(e.ID)) | |||||
sum += n | |||||
if err != nil { | |||||
return sum, err | |||||
} | |||||
if e.Retry != 0 { | |||||
nint, err = fmt.Fprintf(w, "retry: %d\n", int64(e.Retry/time.Millisecond)) | |||||
sum += int64(nint) | |||||
if err != nil { | |||||
return sum, err | |||||
} | |||||
} | |||||
nint, err = w.Write([]byte("\n")) | |||||
sum += int64(nint) | |||||
return sum, err | |||||
} | |||||
func (e *Event) String() string { | |||||
buf := new(strings.Builder) | |||||
_, _ = e.WriteTo(buf) | |||||
return buf.String() | |||||
} |
@ -0,0 +1,54 @@ | |||||
// Copyright 2020 The Gitea Authors. All rights reserved. | |||||
// Use of this source code is governed by a MIT-style | |||||
// license that can be found in the LICENSE file. | |||||
package eventsource | |||||
import ( | |||||
"bytes" | |||||
"testing" | |||||
) | |||||
func Test_wrapNewlines(t *testing.T) { | |||||
tests := []struct { | |||||
name string | |||||
prefix string | |||||
value string | |||||
output string | |||||
}{ | |||||
{ | |||||
"check no new lines", | |||||
"prefix: ", | |||||
"value", | |||||
"prefix: value\n", | |||||
}, | |||||
{ | |||||
"check simple newline", | |||||
"prefix: ", | |||||
"value1\nvalue2", | |||||
"prefix: value1\nprefix: value2\n", | |||||
}, | |||||
{ | |||||
"check pathological newlines", | |||||
"p: ", | |||||
"\n1\n\n2\n3\n", | |||||
"p: \np: 1\np: \np: 2\np: 3\np: \n", | |||||
}, | |||||
} | |||||
for _, tt := range tests { | |||||
t.Run(tt.name, func(t *testing.T) { | |||||
w := &bytes.Buffer{} | |||||
gotSum, err := wrapNewlines(w, []byte(tt.prefix), []byte(tt.value)) | |||||
if err != nil { | |||||
t.Errorf("wrapNewlines() error = %v", err) | |||||
return | |||||
} | |||||
if gotSum != int64(len(tt.output)) { | |||||
t.Errorf("wrapNewlines() = %v, want %v", gotSum, int64(len(tt.output))) | |||||
} | |||||
if gotW := w.String(); gotW != tt.output { | |||||
t.Errorf("wrapNewlines() = %v, want %v", gotW, tt.output) | |||||
} | |||||
}) | |||||
} | |||||
} |
@ -0,0 +1,84 @@ | |||||
// Copyright 2020 The Gitea Authors. All rights reserved. | |||||
// Use of this source code is governed by a MIT-style | |||||
// license that can be found in the LICENSE file. | |||||
package eventsource | |||||
import ( | |||||
"sync" | |||||
) | |||||
// Manager manages the eventsource Messengers | |||||
type Manager struct { | |||||
mutex sync.Mutex | |||||
messengers map[int64]*Messenger | |||||
} | |||||
var manager *Manager | |||||
func init() { | |||||
manager = &Manager{ | |||||
messengers: make(map[int64]*Messenger), | |||||
} | |||||
} | |||||
// GetManager returns a Manager and initializes one as singleton if there's none yet | |||||
func GetManager() *Manager { | |||||
return manager | |||||
} | |||||
// Register message channel | |||||
func (m *Manager) Register(uid int64) <-chan *Event { | |||||
m.mutex.Lock() | |||||
messenger, ok := m.messengers[uid] | |||||
if !ok { | |||||
messenger = NewMessenger(uid) | |||||
m.messengers[uid] = messenger | |||||
} | |||||
m.mutex.Unlock() | |||||
return messenger.Register() | |||||
} | |||||
// Unregister message channel | |||||
func (m *Manager) Unregister(uid int64, channel <-chan *Event) { | |||||
m.mutex.Lock() | |||||
defer m.mutex.Unlock() | |||||
messenger, ok := m.messengers[uid] | |||||
if !ok { | |||||
return | |||||
} | |||||
if messenger.Unregister(channel) { | |||||
delete(m.messengers, uid) | |||||
} | |||||
} | |||||
// UnregisterAll message channels | |||||
func (m *Manager) UnregisterAll() { | |||||
m.mutex.Lock() | |||||
defer m.mutex.Unlock() | |||||
for _, messenger := range m.messengers { | |||||
messenger.UnregisterAll() | |||||
} | |||||
m.messengers = map[int64]*Messenger{} | |||||
} | |||||
// SendMessage sends a message to a particular user | |||||
func (m *Manager) SendMessage(uid int64, message *Event) { | |||||
m.mutex.Lock() | |||||
messenger, ok := m.messengers[uid] | |||||
m.mutex.Unlock() | |||||
if ok { | |||||
messenger.SendMessage(message) | |||||
} | |||||
} | |||||
// SendMessageBlocking sends a message to a particular user | |||||
func (m *Manager) SendMessageBlocking(uid int64, message *Event) { | |||||
m.mutex.Lock() | |||||
messenger, ok := m.messengers[uid] | |||||
m.mutex.Unlock() | |||||
if ok { | |||||
messenger.SendMessageBlocking(message) | |||||
} | |||||
} |
@ -0,0 +1,50 @@ | |||||
// Copyright 2020 The Gitea Authors. All rights reserved. | |||||
// Use of this source code is governed by a MIT-style | |||||
// license that can be found in the LICENSE file. | |||||
package eventsource | |||||
import ( | |||||
"context" | |||||
"time" | |||||
"code.gitea.io/gitea/models" | |||||
"code.gitea.io/gitea/modules/graceful" | |||||
"code.gitea.io/gitea/modules/log" | |||||
"code.gitea.io/gitea/modules/setting" | |||||
"code.gitea.io/gitea/modules/timeutil" | |||||
) | |||||
// Init starts this eventsource | |||||
func (m *Manager) Init() { | |||||
go graceful.GetManager().RunWithShutdownContext(m.Run) | |||||
} | |||||
// Run runs the manager within a provided context | |||||
func (m *Manager) Run(ctx context.Context) { | |||||
then := timeutil.TimeStampNow().Add(-2) | |||||
timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime) | |||||
loop: | |||||
for { | |||||
select { | |||||
case <-ctx.Done(): | |||||
timer.Stop() | |||||
break loop | |||||
case <-timer.C: | |||||
now := timeutil.TimeStampNow().Add(-2) | |||||
uidCounts, err := models.GetUIDsAndNotificationCounts(then, now) | |||||
if err != nil { | |||||
log.Error("Unable to get UIDcounts: %v", err) | |||||
} | |||||
for _, uidCount := range uidCounts { | |||||
m.SendMessage(uidCount.UserID, &Event{ | |||||
Name: "notification-count", | |||||
Data: uidCount, | |||||
}) | |||||
} | |||||
then = now | |||||
} | |||||
} | |||||
m.UnregisterAll() | |||||
} |
@ -0,0 +1,78 @@ | |||||
// Copyright 2020 The Gitea Authors. All rights reserved. | |||||
// Use of this source code is governed by a MIT-style | |||||
// license that can be found in the LICENSE file. | |||||
package eventsource | |||||
import "sync" | |||||
// Messenger is a per uid message store | |||||
type Messenger struct { | |||||
mutex sync.Mutex | |||||
uid int64 | |||||
channels []chan *Event | |||||
} | |||||
// NewMessenger creates a messenger for a particular uid | |||||
func NewMessenger(uid int64) *Messenger { | |||||
return &Messenger{ | |||||
uid: uid, | |||||
channels: [](chan *Event){}, | |||||
} | |||||
} | |||||
// Register returns a new chan []byte | |||||
func (m *Messenger) Register() <-chan *Event { | |||||
m.mutex.Lock() | |||||
// TODO: Limit the number of messengers per uid | |||||
channel := make(chan *Event, 1) | |||||
m.channels = append(m.channels, channel) | |||||
m.mutex.Unlock() | |||||
return channel | |||||
} | |||||
// Unregister removes the provider chan []byte | |||||
func (m *Messenger) Unregister(channel <-chan *Event) bool { | |||||
m.mutex.Lock() | |||||
defer m.mutex.Unlock() | |||||
for i, toRemove := range m.channels { | |||||
if channel == toRemove { | |||||
m.channels = append(m.channels[:i], m.channels[i+1:]...) | |||||
close(toRemove) | |||||
break | |||||
} | |||||
} | |||||
return len(m.channels) == 0 | |||||
} | |||||
// UnregisterAll removes all chan []byte | |||||
func (m *Messenger) UnregisterAll() { | |||||
m.mutex.Lock() | |||||
defer m.mutex.Unlock() | |||||
for _, channel := range m.channels { | |||||
close(channel) | |||||
} | |||||
m.channels = nil | |||||
} | |||||
// SendMessage sends the message to all registered channels | |||||
func (m *Messenger) SendMessage(message *Event) { | |||||
m.mutex.Lock() | |||||
defer m.mutex.Unlock() | |||||
for i := range m.channels { | |||||
channel := m.channels[i] | |||||
select { | |||||
case channel <- message: | |||||
default: | |||||
} | |||||
} | |||||
} | |||||
// SendMessageBlocking sends the message to all registered channels and ensures it gets sent | |||||
func (m *Messenger) SendMessageBlocking(message *Event) { | |||||
m.mutex.Lock() | |||||
defer m.mutex.Unlock() | |||||
for i := range m.channels { | |||||
m.channels[i] <- message | |||||
} | |||||
} |
@ -0,0 +1,112 @@ | |||||
// Copyright 2020 The Gitea Authors. All rights reserved. | |||||
// Use of this source code is governed by a MIT-style | |||||
// license that can be found in the LICENSE file. | |||||
package events | |||||
import ( | |||||
"net/http" | |||||
"time" | |||||
"code.gitea.io/gitea/modules/context" | |||||
"code.gitea.io/gitea/modules/eventsource" | |||||
"code.gitea.io/gitea/modules/graceful" | |||||
"code.gitea.io/gitea/modules/log" | |||||
"code.gitea.io/gitea/routers/user" | |||||
) | |||||
// Events listens for events | |||||
func Events(ctx *context.Context) { | |||||
// FIXME: Need to check if resp is actually a http.Flusher! - how though? | |||||
// Set the headers related to event streaming. | |||||
ctx.Resp.Header().Set("Content-Type", "text/event-stream") | |||||
ctx.Resp.Header().Set("Cache-Control", "no-cache") | |||||
ctx.Resp.Header().Set("Connection", "keep-alive") | |||||
ctx.Resp.Header().Set("X-Accel-Buffering", "no") | |||||
ctx.Resp.WriteHeader(http.StatusOK) | |||||
// Listen to connection close and un-register messageChan | |||||
notify := ctx.Req.Context().Done() | |||||
ctx.Resp.Flush() | |||||
shutdownCtx := graceful.GetManager().ShutdownContext() | |||||
uid := ctx.User.ID | |||||
messageChan := eventsource.GetManager().Register(uid) | |||||
unregister := func() { | |||||
eventsource.GetManager().Unregister(uid, messageChan) | |||||
// ensure the messageChan is closed | |||||
for { | |||||
_, ok := <-messageChan | |||||
if !ok { | |||||
break | |||||
} | |||||
} | |||||
} | |||||
if _, err := ctx.Resp.Write([]byte("\n")); err != nil { | |||||
log.Error("Unable to write to EventStream: %v", err) | |||||
unregister() | |||||
return | |||||
} | |||||
timer := time.NewTicker(30 * time.Second) | |||||
loop: | |||||
for { | |||||
select { | |||||
case <-timer.C: | |||||
event := &eventsource.Event{ | |||||
Name: "ping", | |||||
} | |||||
_, err := event.WriteTo(ctx.Resp) | |||||
if err != nil { | |||||
log.Error("Unable to write to EventStream for user %s: %v", ctx.User.Name, err) | |||||
go unregister() | |||||
break loop | |||||
} | |||||
ctx.Resp.Flush() | |||||
case <-notify: | |||||
go unregister() | |||||
break loop | |||||
case <-shutdownCtx.Done(): | |||||
go unregister() | |||||
break loop | |||||
case event, ok := <-messageChan: | |||||
if !ok { | |||||
break loop | |||||
} | |||||
// Handle logout | |||||
if event.Name == "logout" { | |||||
if ctx.Session.ID() == event.Data { | |||||
_, _ = (&eventsource.Event{ | |||||
Name: "logout", | |||||
Data: "here", | |||||
}).WriteTo(ctx.Resp) | |||||
ctx.Resp.Flush() | |||||
go unregister() | |||||
user.HandleSignOut(ctx) | |||||
break loop | |||||
} | |||||
// Replace the event - we don't want to expose the session ID to the user | |||||
event = (&eventsource.Event{ | |||||
Name: "logout", | |||||
Data: "elsewhere", | |||||
}) | |||||
} | |||||
_, err := event.WriteTo(ctx.Resp) | |||||
if err != nil { | |||||
log.Error("Unable to write to EventStream for user %s: %v", ctx.User.Name, err) | |||||
go unregister() | |||||
break loop | |||||
} | |||||
ctx.Resp.Flush() | |||||
} | |||||
} | |||||
timer.Stop() | |||||
} |