Skip to content

Commit

Permalink
mkv data
Browse files Browse the repository at this point in the history
  • Loading branch information
itsjoshzhang committed Jun 23, 2024
1 parent 0655756 commit 5f6880b
Showing 1 changed file with 90 additions and 68 deletions.
158 changes: 90 additions & 68 deletions benchmarking/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
import time
import pickle
import shutil
import numpy as np
from logging import getLogger

Expand Down Expand Up @@ -37,9 +38,9 @@ def export(self, loader, path):
raise NotImplementedError


class TFLoader(BaseLoader):
class RTXLoader(BaseLoader):
def __init__(self, path, split):
super(TFLoader, self).__init__(path)
super(RTXLoader, self).__init__(path)

builder = tfds.builder_from_directory(path)
self.ds = builder.as_dataset(split)
Expand All @@ -51,83 +52,85 @@ def __iter__(self):
return self.ds.__iter__()


class TFWriter(BaseWriter):
class RTXWriter(BaseWriter):
def __init__(self):
super(TFWriter, self).__init__()
super(RTXWriter, self).__init__()
self.wr = tf.io.TFRecordWriter

def export(self, loader, path):
for i, data in enumerate(loader):
for i, data in enumerate(loader):
data_list = list(data["steps"].as_numpy_iterator())
data_flat = flatten(data_list)

with self.wr(f"{path}/TFWriter_{i}.tfrecord") as writer:
for dl in data_list:
with self.wr(f"{path}/output_{i}.tfrecord") as writer:
for dl in data_flat:
writer.write(dl)

def export_temp(self, data_list, path):
path += "/temp.tfrecord"

with self.wr(path) as writer:
for dl in data_list:
def export_temp(self, data_flat, path):
with self.wr(path + "/temp.tfrecord") as writer:
for dl in data_flat:
writer.write(dl)
os.remove(path)


N = 51
MB = 1024 * 1024

def get_size(obj):
if isinstance(obj, dict):
obj = list(obj.values())
if isinstance(obj, list):
return sum(get_size(v) for v in obj)
elif isinstance(obj, np.ndarray):
return obj.nbytes
if isinstance(obj, bytes):
return len(obj)
if np.isscalar(obj):
return obj.itemsize
raise TypeError(type(obj))


def iterate_dataset(loader, writer, n):
def flatten(data):
if isinstance(data, dict):
data = list(data.values())
if isinstance(data, list):
return [x for lst in data for x in flatten(lst)]
if isinstance(data, bytes):
return [data]
return [data.tobytes()]


def iterate_rtx(loader, writer, n):
read_time, write_time, data_size = 0, 0, 0

for i, data in enumerate(loader):
for i, data in enumerate(loader):
print(f"Measuring trajectory {i}")

stop = time.time()
data_list = list(data["steps"].as_numpy_iterator())
read_time += time.time() - stop

data_size += get_size(data_list)
name = "berkeley_autolab_ur5"
num = f"{str(i).zfill(5)}-of-00412"
data_size += os.path.getsize(f"{PATH}/{name}/{name}-train.tfrecord-{num}")

data_flat = flatten(data_list)
stop = time.time()
writer.export_temp(data_list, PATH)
writer.export_temp(data_flat, PATH)
write_time += time.time() - stop

if i == n: break
return read_time, write_time, data_size / MB


tf_loader = TFLoader(PATH + "berkeley_autolab_ur5", f"train[:{N}]")
tf_writer = TFWriter()
iterate_dataset(tf_loader, N)
print("do we even get here")
exit()
rtx_loader = RTXLoader(PATH + "berkeley_autolab_ur5", f"train[:{N}]")
rtx_writer = RTXWriter()
# rt, wt, mb = iterate_rtx(rtx_loader, rtx_writer, N)

class MKVExporter(BaseWriter):
# print(f"\nRTX Data: \nMem. size = {mb:.4f} MB; Num. traj = {N}")
# print(f"Read: latency = {rt:.4f} s; throughput = {mb / rt :.4f} MB/s, {N / rt :.4f} traj/s")
# print(f"Write: latency = {wt:.4f} s; throughput = {mb / wt :.4f} MB/s, {N / wt :.4f} traj/s")


class MVKWriter(BaseWriter):
def __init__(self):
super(MKVExporter, self).__init__()
super(MVKWriter, self).__init__()

# Function to create a frame from numpy array
def create_frame(self, image_array, stream):
frame = av.VideoFrame.from_ndarray(np.array(image_array), format='rgb24')
def create_frame(self, image, stream):
frame = av.VideoFrame.from_ndarray(np.array(image), format='rgb24')
frame.pict_type = 'NONE'
frame.time_base = stream.time_base
return frame

# Function to create a frame from numpy array
def create_frame_depth(self, image_array, stream):
image_array = np.array(image_array)
def create_frame_depth(self, image, stream):
image_array = np.array(image)
# if float, convert to uint8
if image_array.dtype == np.float32:
image_array = (image_array * 255).astype(np.uint8)
Expand All @@ -139,13 +142,14 @@ def create_frame_depth(self, image_array, stream):
frame.time_base = stream.time_base
return frame

def export(self, loader: BaseLoader, output_path: str):
def export(self, loader, path):
# Create an output container
i = -1
for traj_tensor in loader:
i += 1
trajectory = dict(traj_tensor)
output = av.open(f'{output_path}/output_{i}.mkv', mode='w')
output = av.open(f'{path}/output_{i}.mkv', mode='w')

# Define video streams (assuming images are 640x480 RGB)
video_stream_1 = output.add_stream('libx264', rate=1)
video_stream_1.width = 640
Expand All @@ -159,33 +163,30 @@ def export(self, loader: BaseLoader, output_path: str):

# Define custom data stream for vectors
depth_stream = output.add_stream('libx264', rate=1)

data_stream = output.add_stream('rawvideo', rate=1)

ts = 0
# convert step data to stream
for step_tensor in trajectory["steps"]:
step = dict(step_tensor)
obesrvation = step_tensor["observation"].copy()
obesrvation.pop("image")
obesrvation.pop("hand_image")
obesrvation.pop("image_with_depth")
observation = step_tensor["observation"].copy()
observation.pop("image")
observation.pop("hand_image")
observation.pop("image_with_depth")
non_image_data_step = step.copy()
non_image_data_step["observation"] = obesrvation
non_image_data_step["observation"] = observation

non_image_data_bytes = pickle.dumps(non_image_data_step)
packet = av.Packet(non_image_data_bytes)
packet.stream = data_stream
packet.pts = ts
output.mux(packet)


image =np.array(step["observation"]["image"])
image = np.array(step["observation"]["image"])
# Create a frame from the numpy array
frame = self.create_frame(image, video_stream_1)
frame.pts = ts
packet = video_stream_1.encode(frame)

output.mux(packet)

hand_image =np.array(step["observation"]["hand_image"])
Expand All @@ -195,17 +196,15 @@ def export(self, loader: BaseLoader, output_path: str):
packet = video_stream_2.encode(frame)
output.mux(packet)

# # Create a frame from the numpy array
# Create a frame from the numpy array
frame = self.create_frame_depth(step["observation"]["image_with_depth"], depth_stream)
# frame.pts = ts
# Encode the frame
packet = depth_stream.encode(frame)
# Write the packet to the output file
output.mux(packet)

ts += 1


# Flush the remaining frames
for packet in video_stream_1.encode():
output.mux(packet)
Expand All @@ -219,9 +218,9 @@ def export(self, loader: BaseLoader, output_path: str):


class MKVLoader(BaseLoader):
def __init__(self, data_path):
super(MKVLoader, self).__init__(data_path)
self.files = [data_path + f for f in os.listdir(data_path) if f.endswith('.mkv')]
def __init__(self, path):
super(MKVLoader, self).__init__(path)
self.files = [path + f for f in os.listdir(path) if f.endswith('.mkv')]
self.index = 0

def __len__(self):
Expand All @@ -244,6 +243,7 @@ def __next__(self):
def _parse_mkv_file(self, filename):
print(filename)
input_container = av.open(filename)

video_stream1 = input_container.streams.video[0]
video_stream1.thread_type = 'AUTO'
video_stream2 = input_container.streams.video[1]
Expand Down Expand Up @@ -289,12 +289,34 @@ def _parse_mkv_file(self, filename):
input_container.close()


exporter = MKVExporter()
output_path = os.path.expanduser("~") + "/fog_x/examples/dataloader/mkv_output/"
exporter.export(rtx_loader, output_path)
mkv_loader = MKVLoader(output_path)
def iterate_dataset(loader: BaseLoader, number_of_samples = 50):
for i, data in enumerate(loader):
if i == number_of_samples:
break
iterate_dataset(mkv_loader, number_of_samples)
def iterate_mkv(rtx_loader, mkv_writer, n):
read_time, write_time, data_size = 0, 0, 0
path = PATH + "mkv_convert/"

if len(os.listdir(path)) != 0:
raise SystemError(f"{path} must be empty.")

stop = time.time()
mkv_writer.export(rtx_loader, path)
write_time += time.time() - stop

mkv_loader = MKVLoader(path)

stop = time.time()
for i, data in enumerate(mkv_loader):
print(f"Reading trajectory {i}")
if i == n: break
read_time += time.time() - stop

for i in range(n):
data_size += os.path.getsize(f"{path}/output_{i}.mkv")

return read_time, write_time, data_size / MB


mkv_writer = MVKWriter()
rt, wt, mb = iterate_mkv(rtx_loader, mkv_writer, N)

print(f"\nMKV Data: \nMem. size = {mb:.4f} MB; Num. traj = {N}")
print(f"Read: latency = {rt:.4f} s; throughput = {mb / rt :.4f} MB/s, {N / rt :.4f} traj/s")
print(f"Write: latency = {wt:.4f} s; throughput = {mb / wt :.4f} MB/s, {N / wt :.4f} traj/s")

0 comments on commit 5f6880b

Please sign in to comment.