diff --git a/cats/forecast.py b/cats/forecast.py index 8a6643b..f37fa28 100644 --- a/cats/forecast.py +++ b/cats/forecast.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timedelta @dataclass(order=True) @@ -27,32 +27,109 @@ class CarbonIntensityAverageEstimate: class WindowedForecast: - def __init__(self, data: list[CarbonIntensityPointEstimate], window_size: int): - self.times = [point.datetime for point in data] - self.intensities = [point.value for point in data] - # Integration window size in number of time intervals covered - # by the window. - self.window_size = window_size + def __init__( + self, + data: list[CarbonIntensityPointEstimate], + duration: int, # in minutes + start: datetime, + ): + self.data_stepsize = data[1].datetime - data[0].datetime + self.start = start + # TODO: Expect duration as a timedelta directly + self.end = start + timedelta(minutes=duration) + + # Restrict data points so that start time falls within the + # first data interval. In other we don't need any data prior + # the closest data preceding (on the left of) the job start + # time. + def bisect_right(data, t): + for i, d in enumerate(data): + if d.datetime > t: + return i - 1 + # bisect_right(data, start) returns the index of the first + # data point with datetime value immediately preceding the job + # start time + self.data = data[bisect_right(data, start):] + + # Find number of data points in a window, by finding the index + # of the closest data point past the job end time. Could be + # done with the bisect module in the stdlib for python 3.10+ + # ('key' parameter was introduced in 3.10). + # + # bisect_left(data, self.end, key=lambda x: x.datetime) + # + def bisect_left(data, t): + for i, d in enumerate(data): + if d.datetime >= t: + return i + self.ndata = bisect_left(self.data, self.end) + 1 def __getitem__(self, index: int) -> CarbonIntensityAverageEstimate: """Return the average of timeseries data from index over the window size. Data points are integrated using the trapeziodal rule, that is assuming that forecast data points are joined with a straight line. + + Integral value between two points is the intensity value at + the midpoint times the duration between the two points. This + duration is assumed to be unity and the average is computed by + dividing the total integral value by the number of intervals. """ - v = [ # If you think of a better name, pls help! - 0.5 * (a + b) - for a, b in zip( - self.intensities[index: index + self.window_size], - self.intensities[index + 1 : index + self.window_size + 1] - )] + # Account for the fact that the start and end of each window + # might not fall exactly on data points. The starting + # intensity is interpolated between the first (index) and + # second data point (index + 1) in the window. The ending + # intensity value is interpolated between the last and + # penultimate data points in he window. + window_start = self.start + index * self.data_stepsize + window_end = self.end + index * self.data_stepsize + lbound = self.interp( + self.data[index], + self.data[index + 1], + when=window_start, + ) + rbound = self.interp( + self.data[index + self.ndata - 2], + self.data[index + self.ndata - 1], + when=window_end, + ) + # window_data <- [lbound] + [...bulk...] + [rbound] where + # lbound and rbound are interpolated intensity values. + window_data = ( + [lbound] + + self.data[index + 1: index + self.ndata - 1] + + [rbound] + ) + acc = [ + 0.5 * (a.value + b.value) * (b.datetime - a.datetime).total_seconds() + for a, b in zip(window_data[:-1], window_data[1:]) + ] + duration = window_data[-1].datetime - window_data[0].datetime return CarbonIntensityAverageEstimate( - start=self.times[index], - # Note that `end` points to the _start_ of the last - # interval in the window. - end=self.times[index + self.window_size], - value=sum(v) / self.window_size, + start=window_start, + end=window_end, + value=sum(acc) / duration.total_seconds(), + ) + + @staticmethod + def interp( + p1: CarbonIntensityPointEstimate, + p2: CarbonIntensityPointEstimate, + when: datetime + ) -> CarbonIntensityPointEstimate: + """Return carbon intensity pt estimate at a time between data + points, assuming points are joined by a straight line (linear + interpolation). + """ + timestep = (p2.datetime - p1.datetime).total_seconds() + + slope = (p2.value - p1.value) / timestep + offset = (when - p1.datetime).total_seconds() + + return CarbonIntensityPointEstimate( + value=p1.value + slope * offset, + datetime=when, ) def __iter__(self): @@ -60,4 +137,4 @@ def __iter__(self): yield self.__getitem__(index) def __len__(self): - return len(self.times) - self.window_size - 1 + return len(self.data) - self.ndata diff --git a/cats/optimise_starttime.py b/cats/optimise_starttime.py index 6fe1155..0c8a8c4 100644 --- a/cats/optimise_starttime.py +++ b/cats/optimise_starttime.py @@ -1,4 +1,4 @@ -from math import ceil +from datetime import datetime from .forecast import WindowedForecast @@ -16,13 +16,5 @@ def get_avg_estimates(data, duration=None): # raise ValueError( # "Windowed method timespan cannot be greater than the cached timespan" # ) - - # get length of interval between timestamps - interval = ( - data[1].datetime - data[0].datetime - ).total_seconds() / 60 - wf = WindowedForecast( - data=data, - window_size=ceil(duration / interval) - ) + wf = WindowedForecast(data, duration, start=datetime.now()) return wf[0], min(wf) diff --git a/tests/test_windowed_forecast.py b/tests/test_windowed_forecast.py index 5390aea..8d59aad 100644 --- a/tests/test_windowed_forecast.py +++ b/tests/test_windowed_forecast.py @@ -24,7 +24,7 @@ def test_has_right_length(): window_size = 160 # In number of time intervals - wf = WindowedForecast(DATA, window_size) + wf = WindowedForecast(DATA, window_size, start=DATA[0].datetime) # Expecting (200 - 160 - 1) (39) data points in the time # integrated timeseries. @@ -39,7 +39,7 @@ def test_values(): # a step size `step` small compared the the integration window window_size = 160 - wf = WindowedForecast(DATA, window_size) + wf = WindowedForecast(DATA, window_size, start=DATA[0].datetime) expected = [ math.cos((i + window_size) * step) - math.cos(i * step) @@ -68,7 +68,9 @@ def test_minimise_average(): ] window_size = 6 - result = min(WindowedForecast(data, window_size)) + # Data points separated by 30 minutes intervals + duration = window_size * 30 + result = min(WindowedForecast(data, duration, start=data[0].datetime)) # Intensity point estimates over best runtime period v = [10, 8, 7, 7, 5, 8, 8] @@ -95,7 +97,9 @@ def test_average_intensity_now(): ] window_size = 11 - result = WindowedForecast(data, window_size)[0] + # Data points separated by 30 minutes intervals + duration = window_size * 30 + result = WindowedForecast(data, duration, start=data[0].datetime)[0] # Intensity point estimates over best runtime period v = [p.value for p in data[:window_size + 1]] @@ -107,3 +111,122 @@ def test_average_intensity_now(): ) / window_size ) assert (result == expected) + + +def test_average_intensity_with_offset(): + # Case where job start and end time are not colocated with data + # carbon intensity data points. In this case cats interpolate the + # intensity value at beginning and end of each potential job + # duration window. + CI_forecast = [ + CarbonIntensityPointEstimate(26, datetime(2023,1,1,8,30)), + CarbonIntensityPointEstimate(40, datetime(2023,1,1,9,0)), + CarbonIntensityPointEstimate(50, datetime(2023,1,1,9,30)), + CarbonIntensityPointEstimate(60, datetime(2023,1,1,10,0)), + CarbonIntensityPointEstimate(25, datetime(2023,1,1,10,30)), + ] + duration = 70 # in minutes + # First available data point is for 08:00 but the job + # starts 15 minutes later. + job_start = datetime.fromisoformat("2023-01-01T08:45") + result = WindowedForecast(CI_forecast, duration, start=job_start)[1] + + interp1 = 40 + 15 * (50 - 40) / 30 + interp2 = 60 + 25 * (25 - 60) / 30 + expected = CarbonIntensityAverageEstimate( + start=datetime(2023,1,1,9,15), + end=datetime(2023,1,1,10,25), + value=( + 0.5 * (interp1 + 50) * 15 + + 0.5 * (50 + 60) * 30 + + 0.5 * (60 + interp2) * 25 + ) / duration + ) + assert result == expected + + # Test that the WindowedForecast is able to work with a job start + # beyond the first data interval. Not technically required when + # working with carbonintensity.org.uk, but useful generalisation + # nontheless. + + # When start at 09:15 the WindowedForecast's 'data' attribute + # should discard the first data point at 08:30. + job_start = datetime.fromisoformat("2023-01-01T09:15") + result = WindowedForecast(CI_forecast, duration, start=job_start)[0] + assert result == expected + +def test_average_intensity_with_offset_long_job(): + # Case where job start and end time are not colocated with data + # carbon intensity data points. In this case cats interpolate the + # intensity value at beginning and end of each potential job + # duration window. + with open(TEST_DATA, "r") as f: + csvfile = csv.reader(f, delimiter=",") + next(csvfile) # Skip header line + data = [ + CarbonIntensityPointEstimate( + datetime=datetime.fromisoformat(datestr[:-1]), + value=float(intensity_value), + ) + for datestr, _, _, intensity_value in csvfile + ] + + duration = 194 # in minutes + # First available data point is for 12:30 but the job + # starts 18 minutes later. + job_start = datetime.fromisoformat("2023-05-04T12:48") + result = WindowedForecast(data, duration, start=job_start)[2] + + # First and last element in v are interpolated intensity value. + # e.g v[0] = 15 + 18min * (18 - 15) / 30min = 16.8 + v = [16.8, 18, 19, 17, 16, 11, 11, 11, 11] + data_timestep = data[1].datetime - data[0].datetime # 30 minutes + expected = CarbonIntensityAverageEstimate( + start=job_start + 2 * data_timestep, + end=job_start + 2 * data_timestep + timedelta(minutes=duration), + value=( + 0.5 * (v[0] + v[1]) * 12 + + sum([0.5 * (a + b) * 30 for a, b in zip(v[1:-2], v[2:-1])]) + + 0.5 * (v[7] + v[8]) * 2 + ) / duration + ) + assert (result == expected) + +def test_average_intensity_with_offset_short_job(): + # Case where job is short: start and end time fall between two + # consecutive data points (first and second). + with open(TEST_DATA, "r") as f: + csvfile = csv.reader(f, delimiter=",") + next(csvfile) # Skip header line + data = [ + CarbonIntensityPointEstimate( + datetime=datetime.fromisoformat(datestr[:-1]), + value=float(intensity_value), + ) + for datestr, _, _, intensity_value in csvfile + ] + + duration = 6 # in minutes + # First available data point is for 12:30 but the job + # starts 6 minutes later. + job_start = datetime.fromisoformat("2023-05-04T12:48") + result = WindowedForecast(data, duration, start=job_start)[2] + + # Job starts at 12:48 and ends at 12:54. For each candidate + # running window, both start and end times fall between two + # consecutive data points (e.g. 13:30 and 14:00 for the third + # window). + # + # First and second element in v are interpolated intensity + # values. e.g v[0] = 15 + 18min * (18 - 15) / 30min = 16.8 + # and v[1] = v[-1] = 15 + 24min * (18 - 15) / 30min = 17.4 + v = [16.8, 17.4] + data_timestep = data[1].datetime - data[0].datetime + expected = CarbonIntensityAverageEstimate( + start=job_start + 2 * data_timestep, + end=job_start + 2 * data_timestep + timedelta(minutes=duration), + value=sum( + [0.5 * (a + b) for a, b in zip(v[:-1], v[1:])] + ) / (len(v) - 1) + ) + assert (result == expected)