Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Account for job start/end time not exactly matching forecast data points #54

Merged
merged 23 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2bb2a01
Compute window_size in WindowedForecast __init__
tlestang Jul 22, 2023
8b3b5fe
Add function to interpolation intensity value
tlestang Jul 23, 2023
f6251b3
Interpolate start and end CI for each potential duration
tlestang Jul 23, 2023
d876e4f
Work with nb of data points instead of nb of intervals
tlestang Jul 23, 2023
b2d5488
Use data list directly instead of times and intensities
tlestang Jul 23, 2023
1158bad
test: case where job start/end dont match data points
tlestang Jul 23, 2023
60a29f0
Change a few variable names and layout for readability
tlestang Jul 23, 2023
1c14f0c
Add docstring for interp method
tlestang Jul 23, 2023
93be87c
Move interp method below __getitem__
tlestang Jul 23, 2023
22bb8b7
Remove import of unused ceil function
tlestang Jul 23, 2023
925e6c9
Dont need job duration as WindowedForecast attribute
tlestang Jul 23, 2023
51bc336
Cannot use 'key' param of bisect for python 3.9
tlestang Jul 23, 2023
b7dc142
Fix intensity value at window boundaries instead of midpoints
tlestang Jul 24, 2023
50ff2cc
test: job with duration smaller than time between data points
tlestang Jul 24, 2023
9f61ba8
Remove commented pdb call
tlestang Jul 26, 2023
02ed744
interp function returns a CarbonPointEstimate instance
tlestang Jul 27, 2023
afe12b4
Weight midpoints with interpoint distance
tlestang Jul 27, 2023
b26841e
test: account for weighted midpoints
tlestang Jul 27, 2023
62d1255
test: add a second test with a job spanning a few data points
tlestang Jul 27, 2023
a223b1d
Merge main into adjust_integration_window branch
tlestang Jul 27, 2023
6930399
test: Don't truncate interpolated intensity values
tlestang Jul 28, 2023
db76c19
Dont assume start time falls withing first data interval
tlestang Jul 28, 2023
50319ee
Merge branch 'main' into adjust_integration_window
tlestang Aug 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 73 additions & 17 deletions cats/forecast.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timedelta


@dataclass(order=True)
Llannelongue marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -36,37 +36,93 @@ def __post_init__(self):

class WindowedForecast:

def __init__(self, data: list[CarbonIntensityPointEstimate], window_size: int):
self.times = [point.datetime for point in data]
self.intensities = [point.value for point in data]
# Integration window size in number of time intervals covered
# by the window.
self.window_size = window_size
def __init__(
self,
data: list[CarbonIntensityPointEstimate],
duration: int, # in minutes
start: datetime,
):
self.data = data
self.data_stepsize = data[1].datetime - data[0].datetime
Llannelongue marked this conversation as resolved.
Show resolved Hide resolved
self.start = start
# TODO: Expect duration as a timedelta directly
self.end = start + timedelta(minutes=duration)

# Find number of data points in a window, by finding the index
# of the first data point past the job end time. Could be done
# with the bisect module in the stdlib for python 3.10+ ('key'
# parameter was introduced in 3.10).
#
# bisect_left(data, self.end, key=lambda x: x.datetime)
#
def bisect_left(data, t):
for i, d in enumerate(data):
if d.datetime >= t:
return i
self.ndata = bisect_left(data, self.end) + 1

def __getitem__(self, index: int) -> CarbonIntensityAverageEstimate:
"""Return the average of timeseries data from index over the
window size. Data points are integrated using the trapeziodal
rule, that is assuming that forecast data points are joined
with a straight line.

Integral value between two points is the intensity value at
the midpoint times the duration between the two points. This
duration is assumed to be unity and the average is computed by
dividing the total integral value by the number of intervals.
"""
v = [ # If you think of a better name, pls help!
0.5 * (a + b)
midpt = [
0.5 * (a.value + b.value)
for a, b in zip(
self.intensities[index: index + self.window_size],
self.intensities[index + 1 : index + self.window_size + 1]
self.data[index: index + self.ndata - 1],
self.data[index + 1: index + self.ndata]
)]

# Account for the fact that the start and end of each window
# might not fall exactly on data points. The starting
# intensity is interpolated between the first (index) and
# second data point (index + 1) in the window. The ending
# intensity value is interpolated between the last and
# penultimate data points in he window.
start = self.start + index * self.data_stepsize
i = self.interp(self.data[index], self.data[index + 1], when=start)
midpt[0] = 0.5 * (i + self.data[index + 1].value)

end = self.end + index * self.data_stepsize
i = self.interp(
self.data[index + self.ndata - 2],
self.data[index + self.ndata - 1],
when=end,
)
midpt[-1] = 0.5 * (self.data[index + self.ndata - 2].value + i)

return CarbonIntensityAverageEstimate(
start=self.times[index],
# Note that `end` points to the _start_ of the last
# interval in the window.
end=self.times[index + self.window_size],
value=sum(v) / self.window_size,
start=start,
end=end,
value=sum(midpt) / (self.ndata - 1),
)

@staticmethod
def interp(
p1: CarbonIntensityPointEstimate,
p2: CarbonIntensityPointEstimate,
when: datetime
):
"""Return value of carbon intensity at a time between data
points, assuming points are joined by a straight line (linear
interpolation).
"""
timestep = (p2.datetime - p1.datetime).total_seconds()

slope = (p2.value - p1.value) / timestep
offset = (when - p1.datetime).total_seconds()
# import pdb; pdb.set_trace()
return p1.value + slope * offset # Value at t = start
tlestang marked this conversation as resolved.
Show resolved Hide resolved

def __iter__(self):
for index in range(self.__len__()):
yield self.__getitem__(index)

def __len__(self):
return len(self.times) - self.window_size - 1
return len(self.data) - self.ndata
10 changes: 2 additions & 8 deletions cats/optimise_starttime.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from math import ceil
from datetime import datetime
from .forecast import WindowedForecast


Expand Down Expand Up @@ -28,11 +28,5 @@ def get_avg_estimates(data, method="simple", duration=None):

if method == "windowed":
# get length of interval between timestamps
interval = (
data[1].datetime - data[0].datetime
).total_seconds() / 60
wf = WindowedForecast(
data=data,
window_size=ceil(duration / interval)
)
wf = WindowedForecast(data, duration, start=datetime.now())
return wf[0], min(wf)
49 changes: 45 additions & 4 deletions tests/test_windowed_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

def test_has_right_length():
window_size = 160 # In number of time intervals
wf = WindowedForecast(DATA, window_size)
wf = WindowedForecast(DATA, window_size, start=DATA[0].datetime)

# Expecting (200 - 160 - 1) (39) data points in the time
# integrated timeseries.
Expand All @@ -39,7 +39,7 @@ def test_values():
# a step size `step` small compared the the integration window

window_size = 160
wf = WindowedForecast(DATA, window_size)
wf = WindowedForecast(DATA, window_size, start=DATA[0].datetime)
expected = [

math.cos((i + window_size) * step) - math.cos(i * step)
Expand Down Expand Up @@ -68,7 +68,9 @@ def test_minimise_average():
]

window_size = 6
result = min(WindowedForecast(data, window_size))
# Data points separated by 30 minutes intervals
duration = window_size * 30
result = min(WindowedForecast(data, duration, start=data[0].datetime))

# Intensity point estimates over best runtime period
v = [10, 8, 7, 7, 5, 8, 8]
Expand All @@ -95,7 +97,9 @@ def test_average_intensity_now():
]

window_size = 11
result = WindowedForecast(data, window_size)[0]
# Data points separated by 30 minutes intervals
duration = window_size * 30
result = WindowedForecast(data, duration, start=data[0].datetime)[0]

# Intensity point estimates over best runtime period
v = [p.value for p in data[:window_size + 1]]
Expand All @@ -107,3 +111,40 @@ def test_average_intensity_now():
) / window_size
)
assert (result == expected)


def test_average_intensity_with_offset():
# Case where job start and end time are not colocated with data
# carbon intensity data points. In this case cats interpolate the
# intensity value at beginning and end of each potential job
# duration window.
with open(TEST_DATA, "r") as f:
csvfile = csv.reader(f, delimiter=",")
next(csvfile) # Skip header line
data = [
CarbonIntensityPointEstimate(
datetime=datetime.fromisoformat(datestr[:-1]),
value=float(intensity_value),
)
for datestr, _, _, intensity_value in csvfile
]

duration = 194 # in minutes
# First available data point is for 12:30 but the job
# starts 18 minutes later.
job_start = datetime.fromisoformat("2023-05-04T12:48")
result = WindowedForecast(data, duration, start=job_start)[2]

# First and last element in v are interpolated intensity value.
# e.g v[0] = 15 + 18min * (18 - 15) / 30min = 16.8
v = [16.8, 18, 19, 17, 16, 11, 11, 11, 11]
data_timestep = data[1].datetime - data[0].datetime
expected = CarbonIntensityAverageEstimate(
start=job_start + 2 * data_timestep,
end=job_start + 2 * data_timestep + timedelta(minutes=duration),
value=sum(
[0.5 * (a + b) for a, b in zip(v[:-1], v[1:])]
) / (len(v) - 1)
)
assert (result == expected)