diff --git a/oldNode.go b/oldNode.go index 3555449..0b5fcce 100644 --- a/oldNode.go +++ b/oldNode.go @@ -147,15 +147,22 @@ func receiveFile(ctx context.Context, nodeName string) { // Create a buffer stream for non blocking read and write. rw := bufio.NewReader(stream) - f, err := os.Create("nodes/saveFilePath.txt") + generatedName := fmt.Sprintf("nodes/%s.part", util.RandString(10)) + f, err := os.Create(generatedName) if err != nil { log.Println(err) return } - go util.StreamToFile(rw, f) - - // 'stream' will stay open until you close it (or the other side closes it). + receivedName := util.StreamToFile(rw, f) + log.Println(receivedName) + if receivedName != "" { + receivedName = filepath.Base(receivedName) + err = os.Rename(generatedName, "nodes/"+receivedName) + if err != nil { + log.Println(err) + } + } }) f, err := os.Open(nodeDir) diff --git a/pkg/util/util.go b/pkg/util/util.go index 23559fd..65f398b 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -2,10 +2,14 @@ package util import ( "bufio" + "encoding/binary" "fmt" "io" "log" + "math/rand" "os" + "time" + "unsafe" "github.com/Azanul/peer-pressure/pkg/pressure/pb" "google.golang.org/protobuf/proto" @@ -32,17 +36,28 @@ func AppendStringToFile(path string, content string) { } } -func StreamToFile(rw *bufio.Reader, file *os.File) { +func StreamToFile(rw *bufio.Reader, file *os.File) (filename string) { writer := bufio.NewWriter(file) + lenBytes := make([]byte, 4) for { - str, err := io.ReadAll(rw) - if err == io.EOF || len(str) == 0 { + n, err := rw.Read(lenBytes) + if err == io.EOF || n == 0 { log.Printf("%s done writing", file.Name()) break } else if err != nil { fmt.Println("Error reading from buffer") panic(err) } + messageSize := binary.BigEndian.Uint32(lenBytes) + + str := make([]byte, messageSize) + _, err = io.ReadFull(rw, str) + if err != nil { + fmt.Println("Error reading from buffer") + panic(err) + } + log.Println(n, len(str), str) + chunk := pb.Chunk{} err = proto.Unmarshal(str, &chunk) if err != nil { @@ -50,10 +65,21 @@ func StreamToFile(rw *bufio.Reader, file *os.File) { panic(err) } - log.Println(str) log.Println(chunk.Data) + log.Println(*chunk.Len) + log.Println(*chunk.Filename) log.Println() - _, err = writer.Write(chunk.Data) + if chunk.Filename != nil { + filename = *chunk.Filename + } + + var data []byte + if chunk.Len == nil { + data = chunk.Data + } else { + data = chunk.Data[:*chunk.Len] + } + _, err = writer.Write(data) if err != nil { log.Println(err) } @@ -62,14 +88,17 @@ func StreamToFile(rw *bufio.Reader, file *os.File) { if err != nil { log.Panic(err) } + return } func FileToStream(rw *bufio.Writer, file *os.File) { data := make([]byte, chunkSize) + filename := file.Name() var partNum int32 = 0 for { - _, err := file.Read(data) + n, err := file.Read(data) + lenData := int32(n) if err == io.EOF { break } else if err != nil { @@ -79,20 +108,33 @@ func FileToStream(rw *bufio.Writer, file *os.File) { partNum++ chunk := &pb.Chunk{ - Index: partNum, - Data: data, + Index: partNum, + Data: data, + Filename: &filename, + Len: &lenData, } sendData, err := proto.Marshal(chunk) if err != nil { log.Println("Error marshaling proto chunk") panic(err) } + + messageSize := uint32(len(sendData)) + messageSizeBytes := make([]byte, 4) + binary.BigEndian.PutUint32(messageSizeBytes, messageSize) + + _, err = rw.Write(messageSizeBytes) + if err != nil { + log.Println("Error writing size prefix to buffer") + panic(err) + } + _, err = rw.Write(sendData) if err != nil { log.Println("Error writing to buffer") panic(err) } - log.Println(rw.Available()) + log.Println(lenData, len(sendData), sendData) err = rw.Flush() if err != nil { log.Println("Error flushing buffer") @@ -100,3 +142,30 @@ func FileToStream(rw *bufio.Writer, file *os.File) { } } } + +const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" +const ( + letterIdxBits = 6 // 6 bits to represent a letter index + letterIdxMask = 1<= 0; { + if remain == 0 { + cache, remain = src.Int63(), letterIdxMax + } + if idx := int(cache & letterIdxMask); idx < len(letterBytes) { + b[i] = letterBytes[idx] + i-- + } + cache >>= letterIdxBits + remain-- + } + + return *(*string)(unsafe.Pointer(&b)) +}