Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Account for job start/end time not exactly matching forecast data points #54

Merged
merged 23 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2bb2a01
Compute window_size in WindowedForecast __init__
tlestang Jul 22, 2023
8b3b5fe
Add function to interpolation intensity value
tlestang Jul 23, 2023
f6251b3
Interpolate start and end CI for each potential duration
tlestang Jul 23, 2023
d876e4f
Work with nb of data points instead of nb of intervals
tlestang Jul 23, 2023
b2d5488
Use data list directly instead of times and intensities
tlestang Jul 23, 2023
1158bad
test: case where job start/end dont match data points
tlestang Jul 23, 2023
60a29f0
Change a few variable names and layout for readability
tlestang Jul 23, 2023
1c14f0c
Add docstring for interp method
tlestang Jul 23, 2023
93be87c
Move interp method below __getitem__
tlestang Jul 23, 2023
22bb8b7
Remove import of unused ceil function
tlestang Jul 23, 2023
925e6c9
Dont need job duration as WindowedForecast attribute
tlestang Jul 23, 2023
51bc336
Cannot use 'key' param of bisect for python 3.9
tlestang Jul 23, 2023
b7dc142
Fix intensity value at window boundaries instead of midpoints
tlestang Jul 24, 2023
50ff2cc
test: job with duration smaller than time between data points
tlestang Jul 24, 2023
9f61ba8
Remove commented pdb call
tlestang Jul 26, 2023
02ed744
interp function returns a CarbonPointEstimate instance
tlestang Jul 27, 2023
afe12b4
Weight midpoints with interpoint distance
tlestang Jul 27, 2023
b26841e
test: account for weighted midpoints
tlestang Jul 27, 2023
62d1255
test: add a second test with a job spanning a few data points
tlestang Jul 27, 2023
a223b1d
Merge main into adjust_integration_window branch
tlestang Jul 27, 2023
6930399
test: Don't truncate interpolated intensity values
tlestang Jul 28, 2023
db76c19
Dont assume start time falls withing first data interval
tlestang Jul 28, 2023
50319ee
Merge branch 'main' into adjust_integration_window
tlestang Aug 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 96 additions & 19 deletions cats/forecast.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timedelta


@dataclass(order=True)
Llannelongue marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -36,37 +36,114 @@ def __post_init__(self):

class WindowedForecast:

def __init__(self, data: list[CarbonIntensityPointEstimate], window_size: int):
self.times = [point.datetime for point in data]
self.intensities = [point.value for point in data]
# Integration window size in number of time intervals covered
# by the window.
self.window_size = window_size
def __init__(
self,
data: list[CarbonIntensityPointEstimate],
duration: int, # in minutes
start: datetime,
):
self.data_stepsize = data[1].datetime - data[0].datetime
Llannelongue marked this conversation as resolved.
Show resolved Hide resolved
self.start = start
# TODO: Expect duration as a timedelta directly
self.end = start + timedelta(minutes=duration)

# Restrict data points so that start time falls within the
# first data interval. In other we don't need any data prior
# the closest data preceding (on the left of) the job start
# time.
def bisect_right(data, t):
for i, d in enumerate(data):
if d.datetime > t:
return i - 1
# bisect_right(data, start) returns the index of the first
# data point with datetime value immediately preceding the job
# start time
self.data = data[bisect_right(data, start):]

# Find number of data points in a window, by finding the index
# of the closest data point past the job end time. Could be
# done with the bisect module in the stdlib for python 3.10+
# ('key' parameter was introduced in 3.10).
#
# bisect_left(data, self.end, key=lambda x: x.datetime)
#
def bisect_left(data, t):
for i, d in enumerate(data):
if d.datetime >= t:
return i
self.ndata = bisect_left(self.data, self.end) + 1

def __getitem__(self, index: int) -> CarbonIntensityAverageEstimate:
"""Return the average of timeseries data from index over the
window size. Data points are integrated using the trapeziodal
rule, that is assuming that forecast data points are joined
with a straight line.

Integral value between two points is the intensity value at
the midpoint times the duration between the two points. This
duration is assumed to be unity and the average is computed by
dividing the total integral value by the number of intervals.
"""
v = [ # If you think of a better name, pls help!
0.5 * (a + b)
for a, b in zip(
self.intensities[index: index + self.window_size],
self.intensities[index + 1 : index + self.window_size + 1]
)]

# Account for the fact that the start and end of each window
# might not fall exactly on data points. The starting
# intensity is interpolated between the first (index) and
# second data point (index + 1) in the window. The ending
# intensity value is interpolated between the last and
# penultimate data points in he window.
window_start = self.start + index * self.data_stepsize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One potential downside of doing all the calculations in the __getitem__ method is that it requires redoing calculations every time (I think?): as in, min already calculates all windows, but then w[0] does it again etc.. An alternative would be to calculate all windows once, store it in self and only access it after that. Or are there performance benefits to doing everything here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I think performance should not be a concern here. Even then, only the the first window is computed twice. But yes, if you were to run min twice, you'd compute all windows twice.

window_end = self.end + index * self.data_stepsize
lbound = self.interp(
self.data[index],
self.data[index + 1],
when=window_start,
)
rbound = self.interp(
self.data[index + self.ndata - 2],
self.data[index + self.ndata - 1],
when=window_end,
)
# window_data <- [lbound] + [...bulk...] + [rbound] where
# lbound and rbound are interpolated intensity values.
window_data = (
[lbound] +
self.data[index + 1: index + self.ndata - 1] +
[rbound]
)
acc = [
0.5 * (a.value + b.value) * (b.datetime - a.datetime).total_seconds()
for a, b in zip(window_data[:-1], window_data[1:])
]
duration = window_data[-1].datetime - window_data[0].datetime
return CarbonIntensityAverageEstimate(
start=self.times[index],
# Note that `end` points to the _start_ of the last
# interval in the window.
end=self.times[index + self.window_size],
value=sum(v) / self.window_size,
start=window_start,
end=window_end,
value=sum(acc) / duration.total_seconds(),
)

@staticmethod
def interp(
p1: CarbonIntensityPointEstimate,
p2: CarbonIntensityPointEstimate,
when: datetime
) -> CarbonIntensityPointEstimate:
"""Return carbon intensity pt estimate at a time between data
points, assuming points are joined by a straight line (linear
interpolation).
"""
timestep = (p2.datetime - p1.datetime).total_seconds()

slope = (p2.value - p1.value) / timestep
offset = (when - p1.datetime).total_seconds()

return CarbonIntensityPointEstimate(
value=p1.value + slope * offset,
datetime=when,
)

def __iter__(self):
for index in range(self.__len__()):
yield self.__getitem__(index)

def __len__(self):
return len(self.times) - self.window_size - 1
return len(self.data) - self.ndata
12 changes: 2 additions & 10 deletions cats/optimise_starttime.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from math import ceil
from datetime import datetime
from .forecast import WindowedForecast


Expand All @@ -16,13 +16,5 @@ def get_avg_estimates(data, duration=None):
# raise ValueError(
# "Windowed method timespan cannot be greater than the cached timespan"
# )

# get length of interval between timestamps
interval = (
data[1].datetime - data[0].datetime
).total_seconds() / 60
wf = WindowedForecast(
data=data,
window_size=ceil(duration / interval)
)
wf = WindowedForecast(data, duration, start=datetime.now())
return wf[0], min(wf)
131 changes: 127 additions & 4 deletions tests/test_windowed_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

def test_has_right_length():
window_size = 160 # In number of time intervals
wf = WindowedForecast(DATA, window_size)
wf = WindowedForecast(DATA, window_size, start=DATA[0].datetime)

# Expecting (200 - 160 - 1) (39) data points in the time
# integrated timeseries.
Expand All @@ -39,7 +39,7 @@ def test_values():
# a step size `step` small compared the the integration window

window_size = 160
wf = WindowedForecast(DATA, window_size)
wf = WindowedForecast(DATA, window_size, start=DATA[0].datetime)
expected = [

math.cos((i + window_size) * step) - math.cos(i * step)
Expand Down Expand Up @@ -68,7 +68,9 @@ def test_minimise_average():
]

window_size = 6
result = min(WindowedForecast(data, window_size))
# Data points separated by 30 minutes intervals
duration = window_size * 30
result = min(WindowedForecast(data, duration, start=data[0].datetime))

# Intensity point estimates over best runtime period
v = [10, 8, 7, 7, 5, 8, 8]
Expand All @@ -95,7 +97,9 @@ def test_average_intensity_now():
]

window_size = 11
result = WindowedForecast(data, window_size)[0]
# Data points separated by 30 minutes intervals
duration = window_size * 30
result = WindowedForecast(data, duration, start=data[0].datetime)[0]

# Intensity point estimates over best runtime period
v = [p.value for p in data[:window_size + 1]]
Expand All @@ -107,3 +111,122 @@ def test_average_intensity_now():
) / window_size
)
assert (result == expected)


def test_average_intensity_with_offset():
# Case where job start and end time are not colocated with data
# carbon intensity data points. In this case cats interpolate the
# intensity value at beginning and end of each potential job
# duration window.
CI_forecast = [
CarbonIntensityPointEstimate(datetime(2023,1,1,8,30), value=26),
CarbonIntensityPointEstimate(datetime(2023,1,1,9,0), value=40),
CarbonIntensityPointEstimate(datetime(2023,1,1,9,30), value=50),
CarbonIntensityPointEstimate(datetime(2023,1,1,10,0), value=60),
CarbonIntensityPointEstimate(datetime(2023,1,1,10,30), value=25),
]
duration = 70 # in minutes
# First available data point is for 08:00 but the job
# starts 15 minutes later.
job_start = datetime.fromisoformat("2023-01-01T08:45")
result = WindowedForecast(CI_forecast, duration, start=job_start)[1]
Llannelongue marked this conversation as resolved.
Show resolved Hide resolved

interp1 = 40 + 15 * (50 - 40) / 30
interp2 = 60 + 25 * (25 - 60) / 30
expected = CarbonIntensityAverageEstimate(
start=datetime(2023,1,1,9,15),
end=datetime(2023,1,1,10,25),
value=(
0.5 * (interp1 + 50) * 15 +
0.5 * (50 + 60) * 30 +
0.5 * (60 + interp2) * 25
) / duration
)
assert result == expected

# Test that the WindowedForecast is able to work with a job start
# beyond the first data interval. Not technically required when
# working with carbonintensity.org.uk, but useful generalisation
# nontheless.

# When start at 09:15 the WindowedForecast's 'data' attribute
# should discard the first data point at 08:30.
job_start = datetime.fromisoformat("2023-01-01T09:15")
result = WindowedForecast(CI_forecast, duration, start=job_start)[0]
assert result == expected

def test_average_intensity_with_offset_long_job():
# Case where job start and end time are not colocated with data
# carbon intensity data points. In this case cats interpolate the
# intensity value at beginning and end of each potential job
# duration window.
with open(TEST_DATA, "r") as f:
csvfile = csv.reader(f, delimiter=",")
next(csvfile) # Skip header line
data = [
CarbonIntensityPointEstimate(
datetime=datetime.fromisoformat(datestr[:-1]),
value=float(intensity_value),
)
for datestr, _, _, intensity_value in csvfile
]

duration = 194 # in minutes
# First available data point is for 12:30 but the job
# starts 18 minutes later.
job_start = datetime.fromisoformat("2023-05-04T12:48")
result = WindowedForecast(data, duration, start=job_start)[2]

# First and last element in v are interpolated intensity value.
# e.g v[0] = 15 + 18min * (18 - 15) / 30min = 16.8
v = [16.8, 18, 19, 17, 16, 11, 11, 11, 11]
data_timestep = data[1].datetime - data[0].datetime # 30 minutes
expected = CarbonIntensityAverageEstimate(
start=job_start + 2 * data_timestep,
end=job_start + 2 * data_timestep + timedelta(minutes=duration),
value=(
0.5 * (v[0] + v[1]) * 12 +
sum([0.5 * (a + b) * 30 for a, b in zip(v[1:-2], v[2:-1])]) +
0.5 * (v[7] + v[8]) * 2
) / duration
)
assert (result == expected)

def test_average_intensity_with_offset_short_job():
# Case where job is short: start and end time fall between two
# consecutive data points (first and second).
with open(TEST_DATA, "r") as f:
csvfile = csv.reader(f, delimiter=",")
next(csvfile) # Skip header line
data = [
CarbonIntensityPointEstimate(
datetime=datetime.fromisoformat(datestr[:-1]),
value=float(intensity_value),
)
for datestr, _, _, intensity_value in csvfile
]

duration = 6 # in minutes
# First available data point is for 12:30 but the job
# starts 6 minutes later.
job_start = datetime.fromisoformat("2023-05-04T12:48")
result = WindowedForecast(data, duration, start=job_start)[2]

# Job starts at 12:48 and ends at 12:54. For each candidate
# running window, both start and end times fall between two
# consecutive data points (e.g. 13:30 and 14:00 for the third
# window).
#
# First and second element in v are interpolated intensity
# values. e.g v[0] = 15 + 18min * (18 - 15) / 30min = 16.8
# and v[1] = v[-1] = 15 + 24min * (18 - 15) / 30min = 17.4
v = [16.8, 17.4]
data_timestep = data[1].datetime - data[0].datetime
expected = CarbonIntensityAverageEstimate(
start=job_start + 2 * data_timestep,
end=job_start + 2 * data_timestep + timedelta(minutes=duration),
value=sum(
[0.5 * (a + b) for a, b in zip(v[:-1], v[1:])]
) / (len(v) - 1)
)
assert (result == expected)