Skip to content

Commit

Permalink
feat: optimized viewer msg sending with parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
dd84ai committed Nov 24, 2023
1 parent 7a65f66 commit 0711e78
Show file tree
Hide file tree
Showing 12 changed files with 279 additions and 72 deletions.
10 changes: 8 additions & 2 deletions app/discorder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"darkbot/app/settings"
"darkbot/app/settings/logus"
"darkbot/app/settings/types"
"darkbot/app/settings/utils"
"fmt"
"time"

"github.com/bwmarrin/discordgo"
Expand Down Expand Up @@ -41,8 +43,12 @@ func (d *Discorder) SengMessage(channelID types.DiscordChannelID, content string
}

func (d *Discorder) EditMessage(channelID types.DiscordChannelID, messageID types.DiscordMessageID, content string) error {
_, err := d.dg.ChannelMessageEdit(string(channelID), string(messageID), content)
logus.CheckWarn(err, "failed editing message in discorder", logus.ChannelID(channelID))
var err error
utils.TimeMeasure(func() {
msg, err := d.dg.ChannelMessageEdit(string(channelID), string(messageID), content)
logus.CheckWarn(err, "failed editing message in discorder", logus.ChannelID(channelID))
logus.Debug(fmt.Sprintf("Discorder.EditMessage.msg=%v", msg))
}, fmt.Sprintf("Discorder.EditMessage content=%s", content), logus.ChannelID(channelID), logus.MessageID(messageID))
return err
}

Expand Down
10 changes: 3 additions & 7 deletions app/forumer/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"darkbot/app/forumer/forum_types"
"darkbot/app/settings/logus"
"darkbot/app/settings/types"
"os"
"darkbot/app/settings/utils"
"testing"
)

Expand All @@ -26,10 +26,6 @@ func newMockedThreadsQuery() MockedThreadsQuery {
return MockedThreadsQuery{threads: one_thread}
}

func FixtureDevEnv() bool {
return os.Getenv("DEV_ENV") == "true"
}

func TestForumerSending(t *testing.T) {

mocked_post_requester := FixtureDetailedRequester()
Expand All @@ -40,7 +36,7 @@ func TestForumerSending(t *testing.T) {

cg_channel := configurator.NewConfiguratorChannel(configurator.NewConfigurator(dbpath))

if FixtureDevEnv() {
if utils.FixtureDevEnv() {
cg_channel.Add(dev_env_channel)
}

Expand All @@ -64,7 +60,7 @@ func TestSubForumSending(t *testing.T) {

cg_channel := configurator.NewConfiguratorChannel(configurator.NewConfigurator(dbpath))

if FixtureDevEnv() {
if utils.FixtureDevEnv() {
cg_channel.Add(dev_env_channel)
}

Expand Down
2 changes: 2 additions & 0 deletions app/settings/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type APIurl string
type ScrappyLoopDelay int
type ViewerLoopDelay int

type ViewerDelayBetweenChannels int

type DiscordChannelID string

type DiscordMessageID string
Expand Down
7 changes: 7 additions & 0 deletions app/settings/utils/fixtures.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package utils

import "os"

func FixtureDevEnv() bool {
return os.Getenv("DEV_ENV") == "true"
}
31 changes: 31 additions & 0 deletions app/settings/utils/time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package utils

import (
"darkbot/app/settings/logus"
"fmt"
"time"
)

type timeMeasurer struct {
msg string
ops []logus.SlogParam
time_started time.Time
}

func NewTimeMeasure(msg string, ops ...logus.SlogParam) *timeMeasurer {
return &timeMeasurer{
msg: msg,
ops: ops,
time_started: time.Now(),
}
}

func (t *timeMeasurer) Close() {
logus.Debug(fmt.Sprintf("time_measure %v | %s", time.Since(t.time_started), t.msg), t.ops...)
}

func TimeMeasure(callback func(), msg string, ops ...logus.SlogParam) {
time_started := NewTimeMeasure(msg, ops...)
defer time_started.Close()
callback()
}
5 changes: 3 additions & 2 deletions app/settings/worker/worker_temp.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func NewTask(id worker_types.TaskID) *Task {
const (
CodeSuccess worker_types.TaskStatusCode = 0
CodeTimeout worker_types.TaskStatusCode = 1
CodeFailure worker_types.TaskStatusCode = 2
)

type TaskPool[taskT ITask] struct {
Expand Down Expand Up @@ -74,11 +75,11 @@ func NewTaskPool[T ITask](opts ...TaskPoolOption[T]) *TaskPool[T] {
}

func (j *TaskPool[taskT]) launchWorker(worker_id worker_types.WorkerID, tasks <-chan taskT, results chan<- worker_types.TaskStatusCode) {
logus.Debug("worker started", worker_logus.WorkerID(worker_id))
logus.Info("worker started", worker_logus.WorkerID(worker_id))
for task := range tasks {
results <- task.RunTask(worker_id)
}
logus.Debug("worker finished", worker_logus.WorkerID(worker_id))
logus.Info("worker finished", worker_logus.WorkerID(worker_id))
}

/// Temporal
Expand Down
8 changes: 6 additions & 2 deletions app/viewer/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"darkbot/app/discorder"
"darkbot/app/settings/logus"
"darkbot/app/settings/types"
"darkbot/app/settings/utils"
"darkbot/app/viewer/apis"
"darkbot/app/viewer/views"
"darkbot/app/viewer/views/baseview"
"darkbot/app/viewer/views/eventview"
"darkbot/app/viewer/views/playerview"
"fmt"
"strings"
"time"
)
Expand Down Expand Up @@ -61,8 +63,10 @@ func (v *ChannelView) Render() {
// Edit if message ID is present.
// Send if not present.
func (v ChannelView) Send() {
for _, view := range v.views {
view.Send()
for view_num, view := range v.views {
utils.TimeMeasure(func() {
view.Send()
}, fmt.Sprintf("view.Send view_num=%d, view=%v", view_num, view), logus.ChannelID(v.ChannelID))
}
}

Expand Down
105 changes: 105 additions & 0 deletions app/viewer/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package viewer

import (
"darkbot/app/settings/logus"
"darkbot/app/settings/types"
"darkbot/app/settings/utils"
"darkbot/app/settings/worker"
"darkbot/app/settings/worker/worker_types"
"darkbot/app/viewer/apis"
"fmt"
"sync"
"time"
)

type TaskRefreshChannel struct {
*worker.Task

// any desired arbitary data
api *apis.API
channelID types.DiscordChannelID
delayBetweenChannels types.ViewerDelayBetweenChannels
}

func NewRefreshChannelTask(
api *apis.API,
channelID types.DiscordChannelID,
delayBetweenChannels types.ViewerDelayBetweenChannels,
) *TaskRefreshChannel {
task_id_gen += 1
return &TaskRefreshChannel{
Task: worker.NewTask(worker_types.TaskID(task_id_gen)),
api: api,
channelID: channelID,
delayBetweenChannels: delayBetweenChannels,
}
}

var task_id_gen int = 0

var guildAntiRateLimitMutexes map[string]*sync.Mutex

func init() {
guildAntiRateLimitMutexes = make(map[string]*sync.Mutex)
}

func GetMutex(MutexKey string) *sync.Mutex {
value, ok := guildAntiRateLimitMutexes[MutexKey]

if ok {
return value
}

new_mutex := &sync.Mutex{}
guildAntiRateLimitMutexes[MutexKey] = new_mutex
return new_mutex
}

func (v *TaskRefreshChannel) RunTask(worker_id worker_types.WorkerID) worker_types.TaskStatusCode {
channel_info, err := v.api.Discorder.GetDiscordSession().Channel(string(v.channelID))
if logus.CheckError(err, "unable to get channel info") {
return worker.CodeFailure
}

MutexKey := channel_info.GuildID
GuildMutex := GetMutex(MutexKey)
GuildMutex.Lock()
defer GuildMutex.Unlock()

time_run_task_started := time.Now()
time_new_channel := utils.NewTimeMeasure("new_channel", logus.ChannelID(v.channelID))
channel := NewChannelView(v.api, v.channelID)

time_new_channel.Close()

time_render := utils.NewTimeMeasure("channel.Render", logus.ChannelID(v.channelID))
channel.Render()
time_render.Close()

time_discover := utils.NewTimeMeasure("channel.Discover", logus.ChannelID(v.channelID))
err = channel.Discover()
time_discover.Close()

if logus.CheckWarn(err, "unable to grab Discord msgs", logus.ChannelID(v.channelID)) {
return worker.CodeFailure
}

time_send := utils.NewTimeMeasure("channel.Send", logus.ChannelID(v.channelID))
channel.Send()
time_send.Close()

time_delete_old := utils.NewTimeMeasure("channel.DeleteOld", logus.ChannelID(v.channelID))
channel.DeleteOld()
time_delete_old.Close()
v.SetAsDone()
logus.Info(fmt.Sprintf("RunTask finished, TaskID=%d, elapsed=%s, started_at=%s, finished_at=%s",
v.Task.GetID(),
time.Since(time_run_task_started).String(),
time_run_task_started.String(),
time.Now().String(),
))

// Important for Mutex above! Prevents Guild level rate limits. looks like 5 msg edits per 5 second at one server is good
time.Sleep(time.Duration(v.delayBetweenChannels) * time.Second)
return worker.CodeSuccess
}
47 changes: 31 additions & 16 deletions app/viewer/viewer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,57 +5,72 @@ import (
"darkbot/app/settings"
"darkbot/app/settings/logus"
"darkbot/app/settings/types"
"darkbot/app/settings/utils"
"darkbot/app/settings/worker"
"darkbot/app/viewer/apis"
"time"
)

type ViewerDelays struct {
betweenChannels int
betweenChannels types.ViewerDelayBetweenChannels
betweenLoops types.ViewerLoopDelay
}

type Viewer struct {
delays ViewerDelays
api *apis.API
delays ViewerDelays
api *apis.API
workers *worker.TaskPoolPeristent[*TaskRefreshChannel]
}

func NewViewer(dbpath types.Dbpath, scrappy_storage *scrappy.ScrappyStorage) *Viewer {
api := apis.NewAPI(dbpath, scrappy_storage)
return &Viewer{
v := &Viewer{
api: api,
delays: ViewerDelays{
betweenChannels: 1,
betweenChannels: 10,
betweenLoops: settings.ViewerLoopDelay,
},
}

v.workers = worker.NewTaskPoolPersistent[*TaskRefreshChannel](
worker.WithAllowFailedTasks[*TaskRefreshChannel](),
worker.WithDisableParallelism[*TaskRefreshChannel](false),
worker.WithWorkersAmount[*TaskRefreshChannel](10),
)

return v
}

func (v *Viewer) Run() {
logus.Info("Viewer is now running.")

go func() {
for {
v.workers.AwaitSomeTask()
}
}()
for {
v.Update()
}
}

func (v Viewer) Update() {
func (v *Viewer) Update() {
time_viewer_started := time.Now()
logus.Info("Viewer.Update")

// Query all channels
channelIDs, _ := v.api.Channels.List()
logus.Info("Viewer.Update.channelIDs=", logus.ChannelIDs(channelIDs))

// For each channel
allChannelsTime := utils.NewTimeMeasure("all channels")
for _, channelID := range channelIDs {
channel := NewChannelView(v.api, channelID)
channel.Render()
err := channel.Discover()
if logus.CheckWarn(err, "unable to grab Discord msgs", logus.ChannelID(channelID)) {
continue
}
channel.Send()
channel.DeleteOld()
time.Sleep(time.Duration(v.delays.betweenChannels) * time.Second)
utils.TimeMeasure(func() {
task := NewRefreshChannelTask(v.api, channelID, v.delays.betweenChannels)
// task.RunTask(worker_types.WorkerID(0))
v.workers.DelayTask(task)
}, "one channel", logus.ChannelID(channelID))
}
allChannelsTime.Close()
logus.Info("Viewer.Update Finished " + time.Since(time_viewer_started).String())
time.Sleep(time.Duration(v.delays.betweenLoops) * time.Second)
}
13 changes: 13 additions & 0 deletions app/viewer/viewer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package viewer

import (
"darkbot/app/settings/utils"
"testing"
)

func TestDebugPerformance(t *testing.T) {
if !utils.FixtureDevEnv() {
return
}

}
Loading

0 comments on commit 0711e78

Please sign in to comment.