Skip to content

Commit

Permalink
refactor: implement repo caching
Browse files Browse the repository at this point in the history
Signed-off-by: Nico Braun <rainbowstack@gmail.com>
  • Loading branch information
bluebrown committed Jan 19, 2024
1 parent dc6a5d3 commit 31f62f9
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 21 deletions.
37 changes: 24 additions & 13 deletions kioutil/kioutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -33,6 +34,7 @@ type GitPackageReadWriter struct {
msgFunc MessageFunc
ctx context.Context
msg string
cachePath string
}

func NewGitPackageReadWriter(ctx context.Context, uri, dstBranch string) *GitPackageReadWriter {
Expand All @@ -50,6 +52,7 @@ func (g GitPackageReadWriter) Read() ([]*yaml.RNode, error) {
srcURI: g.srcURI,
SetPathAnnotation: g.SetPathAnnotation,
ctx: g.ctx,
cachePath: g.cachePath,
}).Read()

}
Expand Down Expand Up @@ -86,6 +89,10 @@ func (g *GitPackageReadWriter) SetMsgFunc(fn MessageFunc) {
g.msgFunc = fn
}

func (g *GitPackageReadWriter) SetCachePath(path string) {
g.cachePath = path
}

const (
PathAnnotation = "kio.bluebrown.github.io/path"
)
Expand All @@ -94,6 +101,9 @@ type GitPackageReader struct {
srcURI GitPackageURI
SetPathAnnotation bool
ctx context.Context
// if set, the reader will read from
// the hostpath instead of cloning
cachePath string
}

func NewGitPackageReader(srcURI string) *GitPackageReader {
Expand All @@ -107,13 +117,17 @@ func (g GitPackageReader) Read() ([]*yaml.RNode, error) {
g.ctx = context.Background()
}

hostPath := mustTempDir()
pkgPath := filepath.Join(hostPath, g.srcURI.Pkg)

if err := gitE(g.ctx, "clone", "--branch", g.srcURI.Ref, "--depth", "1", g.srcURI.Repo, hostPath); err != nil {
return nil, fmt.Errorf("clone repo: %w", err)
if g.cachePath == "" {
g.cachePath = mustTempDir()
if err := gitE(g.ctx, "clone", "--branch", g.srcURI.Ref, "--depth", "1", g.srcURI.Repo, g.cachePath); err != nil {
return nil, fmt.Errorf("clone repo: %w", err)
}
} else {
slog.InfoContext(g.ctx, "cache hit, skip cloning", "repo", g.srcURI.Repo)
}

pkgPath := filepath.Join(g.cachePath, g.srcURI.Pkg)

r := kio.LocalPackageReader{
PackageFileName: ".krmignore",
IncludeSubpackages: true,
Expand All @@ -122,14 +136,14 @@ func (g GitPackageReader) Read() ([]*yaml.RNode, error) {
}

if g.SetPathAnnotation {
r.SetAnnotations = map[string]string{PathAnnotation: hostPath}
r.SetAnnotations = map[string]string{PathAnnotation: g.cachePath}
} else {
defer os.RemoveAll(hostPath)
defer os.RemoveAll(g.cachePath)
}

nodes, err := r.Read()
if err != nil {
os.RemoveAll(hostPath)
os.RemoveAll(g.cachePath)
return nil, fmt.Errorf("read package: %w", err)
}

Expand Down Expand Up @@ -173,15 +187,12 @@ func (g *GitPackageWriter) Write(nodes []*yaml.RNode) error {

if remoteHostPath == "" {
remoteHostPath = mustTempDir()
defer os.RemoveAll(remoteHostPath)
}

localHostPath := mustTempDir()
localPkgPath := filepath.Join(localHostPath)

defer func() {
_ = os.RemoveAll(localHostPath)
_ = os.RemoveAll(remoteHostPath)
}()
defer os.RemoveAll(localHostPath)

if err := os.MkdirAll(filepath.Dir(localPkgPath), 0755); err != nil {
return fmt.Errorf("create dir: %w", err)
Expand Down
3 changes: 3 additions & 0 deletions krm/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ type Pipeline struct {
RepoURI string `json:"repoUri,omitempty"`
SrcBranch string `json:"sourceBranch,omitempty"`
DstBranch string `json:"destinationBranch,omitempty"`
CachePath string `json:"cachePath,omitempty"`
}

func (opts Pipeline) Run(ctx context.Context, imageRefs []string) (msg string, changes, warnings []string, err error) {
kf := NewImageRefUpdateFilter(nil, imageRefs...)

grw := kioutil.NewGitPackageReadWriter(ctx, opts.RepoURI, opts.DstBranch)

grw.SetCachePath(opts.CachePath)

grw.SetDiffFunc(func(s1, s2 string) (any, bool, error) {
return kf.Changes, len(kf.Changes) > 0, nil
})
Expand Down
7 changes: 4 additions & 3 deletions task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// the task handler is the final point of execution. after decoding, debouncing
// and aggregating the events, this handler is resonbible for the actual work
func KoboldHandler(ctx context.Context, g store.TaskGroup, runner HookRunner) ([]string, error) {
func KoboldHandler(ctx context.Context, cache string, g store.TaskGroup, runner HookRunner) ([]string, error) {
var (
changes []string
warnings []string
Expand Down Expand Up @@ -43,6 +43,7 @@ func KoboldHandler(ctx context.Context, g store.TaskGroup, runner HookRunner) ([
RepoURI: g.RepoUri.String(),
SrcBranch: g.RepoUri.Ref,
DstBranch: g.DestBranch.String,
CachePath: cache,
}

if err := ctx.Err(); err != nil {
Expand All @@ -68,7 +69,7 @@ func KoboldHandler(ctx context.Context, g store.TaskGroup, runner HookRunner) ([

var _ TaskHandler = KoboldHandler

func PrintHandler(ctx context.Context, g store.TaskGroup, runner HookRunner) ([]string, error) {
func PrintHandler(ctx context.Context, hostPath string, g store.TaskGroup, runner HookRunner) ([]string, error) {
b, err := json.MarshalIndent(g, "", " ")
if err != nil {
return nil, fmt.Errorf("marshal task group: %w", err)
Expand All @@ -79,7 +80,7 @@ func PrintHandler(ctx context.Context, g store.TaskGroup, runner HookRunner) ([]

var _ TaskHandler = PrintHandler

func ThrowHandler(ctx context.Context, g store.TaskGroup, runner HookRunner) ([]string, error) {
func ThrowHandler(ctx context.Context, hostPath string, g store.TaskGroup, runner HookRunner) ([]string, error) {
return nil, fmt.Errorf("throw handler error")
}

Expand Down
4 changes: 4 additions & 0 deletions task/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ var (
Name: "kobold_recv",
Help: "number of messages received",
}, []string{"channel", "rejected"})
metricClone = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "kobold_clone",
Help: "number of clone",
}, []string{"repo"})
)
112 changes: 110 additions & 2 deletions task/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ import (
"fmt"
"io"
"log/slog"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/volatiletech/null/v8"
"golang.org/x/sync/errgroup"
Expand All @@ -35,6 +40,7 @@ type Pool struct {
decoder Decoder
hookRunner HookRunner
cancel context.CancelFunc
size int
}

func NewPool(ctx context.Context, size int, queries *store.Queries) *Pool {
Expand All @@ -49,6 +55,7 @@ func NewPool(ctx context.Context, size int, queries *store.Queries) *Pool {
handler: nil,
decoder: NewStarlarkDecoder(),
hookRunner: NewStarlarkPostHook(),
size: size,
}
}

Expand All @@ -67,13 +74,37 @@ func (p *Pool) Dispatch() error {
return err
}

// each dispatch gets its own cache, to avoid collisions
cache := &repoCache{
repos: make(map[string]string),
base: filepath.Join(os.TempDir(), "kobold-cache-"+uuid.NewString()),
}

// find all unique repos and clone them beforehand in order to avoid
// exsessive cloning
if err := cache.fill(p.ctx, taskGroups, p.size); err != nil {
slog.WarnContext(p.ctx, "failed to fill cache", "error", err)
}

// the waitgroups is used to know when all task groups of this dispatch call
// have been processed. This is used to purge the cache
wg := sync.WaitGroup{}

go func() {
wg.Wait()
cache.purge()
}()

for _, g := range taskGroups {
g := g
wg.Add(1)

// NOTE, returning an error from this function provides a way to hault
// the entire pool. Do not return an error here, unless you want to stop
// the pool.
p.group.Go(func() (err error) {
defer wg.Done()

ids := []string(g.TaskIds)

swapped, err := p.queries.TaskGroupsStatusCompSwap(p.ctx, store.TaskGroupsStatusCompSwapParams{
Expand Down Expand Up @@ -109,7 +140,7 @@ func (p *Pool) Dispatch() error {
)

if p.handler != nil {
warns, err = p.handler(p.ctx, g, p.hookRunner)
warns, err = p.handler(p.ctx, cache.get(g.RepoUri.Repo), g, p.hookRunner)
// if the handler throws an error, we record the outcome but
// continue to process other tasks
if err != nil {
Expand Down Expand Up @@ -145,8 +176,8 @@ func (p *Pool) Dispatch() error {
return nil
})
}
return nil

return nil
}

func (p *Pool) Done() <-chan struct{} {
Expand Down Expand Up @@ -221,3 +252,80 @@ func (p *Pool) QueueReader(ctx context.Context, channel string, r io.Reader) err
}
return scanner.Err()
}

type repoCache struct {
repos map[string]string
base string
path string
}

func (cache *repoCache) fill(ctx context.Context, gg []store.TaskGroup, lim int) error {

// add a random dir inside the cache path to avoid collisions
cache.path = cache.base

if err := os.MkdirAll(cache.path, 0755); err != nil {
return err
}

for _, g := range gg {
cache.repos[g.RepoUri.Repo] = filepath.Join(cache.path, g.RepoUri.Repo)
}

g := errgroup.Group{}
g.SetLimit(lim)

for uri, path := range cache.repos {
uri, path := uri, path
g.Go(func() error {
return cache.ensure(ctx, uri, path)
})
}

err := g.Wait()

return err
}

func (cache *repoCache) ensure(ctx context.Context, uri, path string) error {
if err := ctx.Err(); err != nil {
return err
}

// since fetching leads to merge conflicts, we just delete the repo and
// clone it again with depth 1
if err := os.RemoveAll(path); err != nil {
return fmt.Errorf("remove repo %q: %w", uri, err)
}

cmd := exec.CommandContext(ctx, "git", "clone", "--depth", "1", uri, path)
if b, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("git clone %q: %w: %s", uri, err, string(b))
}

metricClone.With(prometheus.Labels{"repo": uri}).Inc()
slog.InfoContext(ctx, "repo cloned", "repo", uri, "cache", path)

return nil
}

func (cache *repoCache) get(repo string) string {
path := cache.repos[repo]
if path == "" {
return ""
}

d := filepath.Join(cache.path, uuid.NewString())

cmd := exec.Command("cp", "-r", path, d)
if b, err := cmd.CombinedOutput(); err != nil {
slog.Warn("failed to copy repo", "error", err, "output", string(b))
return ""
}

return d
}

func (cache *repoCache) purge() error {
return os.RemoveAll(cache.path)
}
2 changes: 1 addition & 1 deletion task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type HookRunner interface {
Run(group store.TaskGroup, msg string, changes []string, warnings []string) error
}

type TaskHandler func(ctx context.Context, g store.TaskGroup, hook HookRunner) ([]string, error)
type TaskHandler func(ctx context.Context, hostPath string, g store.TaskGroup, hook HookRunner) ([]string, error)

// implement the flag.Value interface
func (t *TaskHandler) String() string {
Expand Down
4 changes: 2 additions & 2 deletions testdata/events.http
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
POST /events?chan=dockerhub HTTP/1.1
Host: localhost:9999
Host: localhost:9000

{
"push_data": {
"tag": "v1.1.2"
"tag": "v1.1.3"
},
"repository": {
"repo_name": "library/busybox"
Expand Down

0 comments on commit 31f62f9

Please sign in to comment.