Skip to content

Commit

Permalink
Revert changes to fullreader
Browse files Browse the repository at this point in the history
  • Loading branch information
CalebQ42 committed Jan 4, 2023
1 parent 658e5c9 commit 089ef53
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 121 deletions.
205 changes: 85 additions & 120 deletions internal/data/fullreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package data

import (
"io"
"sync"

"github.com/CalebQ42/squashfs/internal/decompress"
"github.com/CalebQ42/squashfs/internal/toreader"
Expand All @@ -27,9 +26,9 @@ func NewFullReader(r io.ReaderAt, start uint64, d decompress.Decompressor, block
}
}

func (r *FullReader) AddFragment(rdr func() (io.Reader, error), size uint32) {
func (r *FullReader) AddFragment(rdr func() (io.Reader, error)) {
r.fragRdr = rdr
r.sizes = append(r.sizes, size)
r.sizes = append(r.sizes, 0)
}

type outDat struct {
Expand All @@ -38,50 +37,49 @@ type outDat struct {
i int
}

func (r FullReader) process(index int, offset int64, od *outDat, out chan *outDat) {
defer func() {
out <- od
}()
od.i = index
func (r FullReader) process(index int, offset int64, out chan outDat) {
var err error
var dat []byte
var rdr io.ReadCloser
size := realSize(r.sizes[index])
if size == 0 {
od.err = nil
od.data = make([]byte, r.blockSize)
out <- outDat{
i: index,
err: nil,
data: make([]byte, r.blockSize),
}
return
}
// rdr := io.LimitReader(toreader.NewReader(r.r, offset), int64(size))
if size == r.sizes[index] {
if dec, ok := r.d.(decompress.Decoder); ok {
dat := make([]byte, size)
_, od.err = r.r.ReadAt(dat, offset)
if od.err != nil {
return
dat = make([]byte, size)
_, err = r.r.ReadAt(dat, offset)
if err == nil {
dat, err = dec.Decode(dat, int(r.blockSize))
}
} else {
rdr, err = r.d.Reader(io.LimitReader(toreader.NewReader(r.r, offset), int64(size)))
if err == nil {
dat, err = io.ReadAll(rdr)
}
od.data, od.err = dec.Decode(dat, int(r.blockSize))
return
}
var rdr io.ReadCloser
rdr, od.err = r.d.Reader(io.LimitReader(toreader.NewReader(r.r, offset), int64(size)))
if od.err != nil {
return
}
od.data = make([]byte, r.blockSize)
var read int
read, od.err = rdr.Read(od.data)
od.data = od.data[:read]
rdr.Close()
} else {
od.data = make([]byte, size)
_, od.err = r.r.ReadAt(od.data, offset)
dat = make([]byte, size)
_, err = r.r.ReadAt(dat, offset)
}
out <- outDat{
i: index,
err: err,
data: dat,
}
if clr, ok := rdr.(io.Closer); ok {
clr.Close()
}
}

func (r FullReader) ReadAt(p []byte, off int64) (n int, err error) {
pol := &sync.Pool{
New: func() any {
return new(outDat)
},
}
out := make(chan *outDat, len(r.sizes))
out := make(chan outDat, len(r.sizes))
offset := r.start
num := len(r.sizes)
start := off / int64(r.blockSize)
Expand All @@ -101,42 +99,40 @@ func (r FullReader) ReadAt(p []byte, off int64) (n int, err error) {
offset += uint64(realSize(r.sizes[i]))
continue
}
od := pol.Get().(*outDat)
if i == num-1 && r.fragRdr != nil {
go func() {
defer func() {
out <- od
}()
rdr, e := r.fragRdr()
if err != nil {
od.i = num - 1
od.err = e
out <- outDat{
i: num - 1,
err: e,
}
return
}
od.data = make([]byte, r.sizes[num-1])
_, e = rdr.Read(od.data)
od.i = num - 1
od.err = e
dat, e := io.ReadAll(rdr)
out <- outDat{
i: num - 1,
err: e,
data: dat,
}
if clr, ok := rdr.(io.Closer); ok {
clr.Close()
}
}()
continue
}
go r.process(i, int64(offset), od, out)
go r.process(i, int64(offset), out)
offset += uint64(realSize(r.sizes[i]))
}
cur := start
cache := make(map[int]outDat)
for dat := range out {
for cur := start; cur < int64(end); {
dat := <-out
if dat.err != nil {
err = dat.err
pol.Put(dat)
return
}
if dat.i != int(cur) {
cache[dat.i] = *dat
pol.Put(dat)
cache[dat.i] = dat
continue
}
if cur == start {
Expand All @@ -147,18 +143,16 @@ func (r FullReader) ReadAt(p []byte, off int64) (n int, err error) {
}
n += len(dat.data)
cur++
pol.Put(dat)
var ok bool
var curDat outDat
for {
curDat, ok = cache[int(cur)]
dat, ok = cache[int(cur)]
if !ok {
break
}
for i := range curDat.data {
p[n+i] = curDat.data[i]
for i := range dat.data {
p[n+i] = dat.data[i]
}
n += len(curDat.data)
n += len(dat.data)
cur++
delete(cache, int(cur))
}
Expand All @@ -170,94 +164,65 @@ func (r FullReader) ReadAt(p []byte, off int64) (n int, err error) {
}

func (r FullReader) WriteTo(w io.Writer) (n int64, err error) {
pol := &sync.Pool{
New: func() any {
return new(outDat)
},
}
out := make(chan *outDat, len(r.sizes))
out := make(chan outDat, len(r.sizes))
offset := r.start
num := len(r.sizes)
for i := 0; i < num; i++ {
od := pol.Get().(*outDat)
if i == num-1 && r.fragRdr != nil {
go func() {
defer func() {
out <- od
}()
rdr, e := r.fragRdr()
if err != nil {
od.i = num - 1
od.err = e
out <- outDat{
i: num - 1,
err: e,
}
return
}
buf := make([]byte, r.sizes[num-1])
_, e = rdr.Read(buf)
od.i = num - 1
od.err = e
od.data = buf
dat, e := io.ReadAll(rdr)
out <- outDat{
i: num - 1,
err: e,
data: dat,
}
if clr, ok := rdr.(io.Closer); ok {
clr.Close()
}
}()
continue
}
go r.process(i, int64(offset), od, out)
go r.process(i, int64(offset), out)
offset += uint64(realSize(r.sizes[i]))
}
wt, ok := w.(io.WriterAt)
if !ok {
var cur int
cache := make(map[int]outDat)
var tmpN int
var dat *outDat
for cur < len(r.sizes) {
dat = <-out
defer pol.Put(dat)
if dat.err != nil {
err = dat.err
return
}
if dat.i != cur {
cache[dat.i] = *dat
continue
cache := make(map[int]outDat)
var tmpN int
for cur := 0; cur < num; {
dat := <-out
if dat.err != nil {
err = dat.err
return
}
if dat.i != cur {
cache[dat.i] = dat
continue
}
tmpN, err = w.Write(dat.data)
n += int64(tmpN)
if err != nil {
return
}
cur++
var ok bool
for {
dat, ok = cache[cur]
if !ok {
break
}
tmpN, err = w.Write(dat.data)
n += int64(tmpN)
if err != nil {
return
}
cur++
var ok bool
var curDat outDat
for {
curDat, ok = cache[cur]
if !ok {
break
}
tmpN, err = w.Write(curDat.data)
n += int64(tmpN)
if err != nil {
return
}
cur++
}
}
} else {
var done int
var dat *outDat
for done < len(r.sizes) {
dat = <-out
defer pol.Put(dat)
if dat.err != nil {
err = dat.err
return
}
_, err = wt.WriteAt(dat.data, int64(dat.i*int(r.blockSize)))
if err != nil {
return
}
done++
}
}
return
Expand Down
2 changes: 1 addition & 1 deletion reader_inode.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (r Reader) getReaders(i inode.Inode) (full *data.FullReader, rdr *data.Read
}
fragRdr = io.LimitReader(fragRdr, int64(fragSize))
return fragRdr, nil
}, fragSize)
})
var fragRdr io.Reader
fragRdr, err = r.fragReader(fragInd)
if err != nil {
Expand Down

0 comments on commit 089ef53

Please sign in to comment.