Source code for xocto.ranges

from __future__ import annotations

import datetime
import enum
import functools
import itertools
import operator
from typing import (
    Any,
    Generic,
    Iterable,
    Iterator,
    List,
    Optional,
    Sequence,
    TypeVar,
    Union,
    cast,
)

from . import types


[docs] class RangeBoundaries(enum.Enum): EXCLUSIVE_EXCLUSIVE = "()" EXCLUSIVE_INCLUSIVE = "(]" INCLUSIVE_EXCLUSIVE = "[)" INCLUSIVE_INCLUSIVE = "[]"
[docs] @classmethod def from_bounds( cls, left_exclusive: bool, right_exclusive: bool ) -> "RangeBoundaries": """ Convenience method to get the relevant boundary type by specifiying the exclusivity of each end. """ return { (True, True): RangeBoundaries.EXCLUSIVE_EXCLUSIVE, (True, False): RangeBoundaries.EXCLUSIVE_INCLUSIVE, (False, True): RangeBoundaries.INCLUSIVE_EXCLUSIVE, (False, False): RangeBoundaries.INCLUSIVE_INCLUSIVE, }[(left_exclusive, right_exclusive)]
T = TypeVar("T", bound=types.Comparable) # type: ignore[type-arg]
[docs] @functools.total_ordering class Range(Generic[T]): """ The basic concept of an range of some sort of comparable item, specified by its endpoints and boundaries (whether the endpoints are inclusive or exclusive). This class provides some useful helpers for working with ranges. Usage: * Ranges are declared by specifying their endpoints and boundaries. If the boundaries is omitted, then they are inclusive-exclusive by default >>> Range(0, 2, boundaries=RangeBoundaries.EXCLUSIVE_INCLUSIVE) <Range: (0,2]> >>> Range(0, 2, boundaries="[]") <Range: [0,2]> >>> r = Range(0, 2) >>> print(f"{r}") "[0,2)" * If an endpoint is set as None, then that means that the range is effectively infinite. Infinite ranges must have exclusive bounds for the infinite ends. We provide the continuum to the continnum helper to get an unbounded range. >>> Range(0, None) <Range: [0, None) >>> Range(0, None, boundaries="[]") # Invalid >>> Range.continuum() # Helper function <Range: (None, None)> * Ranges can be declared for any comparable type >>> int_erval: Range[int] = Range(0, 2) >>> date_erval: Range[date] = Range(date(2020, 1, 1), date(2020, 6, 6)) >>> string_erval: Range[str] = Range("ardvark", "zebra") # Uses lexical ordering * Ranges are themselves comparable. Two ranges are ordered by their start, with their end used to break ties >>> sorted([Range(1, 4), Range(0, 5)]) [<Range: [0,5)>, <Range: [1,4)>] >>> sorted([Range(1, 2), Range(None, 2)]) [<Range: [None,2)>, <Range: [1,2)>] >>> sorted([Range(3, 5), Range(3, 4)]) [<Range: [3,4)>, <Range: [4,5)>] >>> sorted([Range(0, 2, boundaries=b) for b in RangeBoundaries]) [<Range: [0,2)>, <Range: [0,2]>, <Range: (0,2)>, <Range: (0,2]>] * The `in` operator is provided to check if an item of the range type is within the the range >>> 0 in Range(0, 2) True >>> 2 in Range(0, 2) False >>> date(2020, 1, 1) in Range(date(2020, 1, 2), date(2020, 1, 5)) False * The `intersection` function (which is aliased to the and (&) operator) will return the overlap of two ranges, or None if they are disjoint >>> Range(0, 2).intersection(Range(1, 4)) <Range: [1,2)> >>> Range(1, 2) & Range(3, 4) None * The `is_disjoint` function will tell you if two ranges are disjoint >>> Range(0, 2).is_disjoint(Range(3, 5)) True >>> Range(0, 2).is_disjoint(Range(2, 5)) # Since the default is not right-inclusive True * The `union` function (which is aliased to the or (|) operator), will return a range which covers two overlapping (or touching) ranges, or None if they are disjoint. >>> Range(0, 2).union(Range(1, 3)) <Range: [0,3)> >>> Range(0, 2) | Range(2, 4) <Range: [0,4)> >>> Range(0, 2) | Range(3, 4) None >>> Range(0, 2) | Range(2, 4, boundaries="(]") None >>> Range(0, 2, boundaries="[]") | Range(3, 4) # Since Range doesn't understand that 2 and 3 are adjacent. None * The `difference` function (which is aliased to the subtraction (-) operator), will return a range which contains the bit of this range which is not covered by the other range, or a rangeset which contains the bits of this range which are not covered (or None if the other range covers this one). >>> Range(0, 4) - Range(2, 4) <Range: [0,2)> >>> Range(0, 4) - Range(2, 3) <RangeSet: {[0,2), [3,4)}> >>> Range(0, 4) - Range(0, 5) None """ __slots__ = [ "start", "end", "boundaries", "_is_left_exclusive", "_is_left_inclusive", "_is_right_exclusive", "_is_right_inclusive", ] def __init__( self, start: Optional[T], end: Optional[T], *, boundaries: Union[str, RangeBoundaries] = RangeBoundaries.INCLUSIVE_EXCLUSIVE, ): """ Validate that the provided start and end values create a valid range for the boundaries. Also set some convenience properties for internal use. """ self.start = start self.end = end # Make sure that we are dealing with an enum in case the class was constructed with the # string representation of the boundaries self.boundaries: RangeBoundaries = RangeBoundaries(boundaries) self._is_left_exclusive = self.boundaries in [ RangeBoundaries.EXCLUSIVE_EXCLUSIVE, RangeBoundaries.EXCLUSIVE_INCLUSIVE, ] self._is_right_exclusive = self.boundaries in [ RangeBoundaries.EXCLUSIVE_EXCLUSIVE, RangeBoundaries.INCLUSIVE_EXCLUSIVE, ] self._is_left_inclusive = not self._is_left_exclusive self._is_right_inclusive = not self._is_right_exclusive if self.start is None: if self._is_left_inclusive: raise ValueError("Range with unbounded start must be left-exclusive") if self.end is None: if self._is_right_inclusive: raise ValueError("Range with unbounded end must be right-exclusive") elif self.start is not None: check_op = { RangeBoundaries.EXCLUSIVE_EXCLUSIVE: operator.lt, RangeBoundaries.EXCLUSIVE_INCLUSIVE: operator.lt, RangeBoundaries.INCLUSIVE_EXCLUSIVE: operator.lt, RangeBoundaries.INCLUSIVE_INCLUSIVE: operator.le, }[self.boundaries] if not check_op(self.start, self.end): raise ValueError("Invalid boundaries for range")
[docs] @classmethod def continuum(cls) -> Range[T]: """ Return a range representing the continnum. """ return cls(None, None, boundaries="()")
def __str__(self) -> str: if self._is_left_exclusive: lbracket = "(" else: lbracket = "[" if self._is_right_exclusive: rbracket = ")" else: rbracket = "]" return f"{lbracket}{self.start},{self.end}{rbracket}" def __repr__(self) -> str: return f"<Range: {str(self)}>" def __hash__(self) -> int: return hash((self.start, self.end, self.boundaries)) def __eq__(self, other: Any) -> bool: if not isinstance(other, Range): return False return (self.start, self.end, self.boundaries) == ( other.start, other.end, other.boundaries, ) def __lt__(self, other: "Range[T]") -> bool: if self.start == other.start: if self._is_left_exclusive and not other._is_left_exclusive: return False if (not self._is_left_exclusive) and other._is_left_exclusive: return True if self.end == other.end: if self._is_right_exclusive and not other._is_right_exclusive: return True return False else: # If one endpoint is None then that range is greater, otherwise compare them return (other.end is None) or ( self.end is not None and self.end < other.end ) else: # If one endpoint is None then that range is lesser, otherwise compare them return (self.start is None) or ( other.start is not None and self.start < other.start ) def __contains__(self, item: T) -> bool: """ Check if the provided item is inside the range. """ return self._is_inside_left_bound(item) and self._is_inside_right_bound(item) def _is_inside_left_bound(self, item: T) -> bool: """ Check if the provided item is inside our left bound. """ if self.start is None: return True elif self._is_left_exclusive: return item > self.start else: return item >= self.start def _is_inside_right_bound(self, item: T) -> bool: """ Check if the provided item is inside our right bound. """ if self.end is None: return True elif self._is_right_exclusive: return item < self.end else: return item <= self.end
[docs] def is_disjoint(self, other: "Range[T]") -> bool: """ Test whether the two ranges are disjoint. """ if self.end is not None and other.start is not None: if not ( self._is_inside_right_bound(other.start) and other._is_inside_left_bound(self.end) ): return True if self.start is not None and other.end is not None: if not ( self._is_inside_left_bound(other.end) and other._is_inside_right_bound(self.start) ): return True return False
[docs] def intersection(self, other: "Range[T]") -> Optional["Range[T]"]: """ Return the intersection of the two ranges if it exists, or None if they are disjoint. """ if self.is_disjoint(other): return None range_l, range_r = (self, other) if self < other else (other, self) # Since we have sorted the two ranges, the left will always be determined by range_r, but # the right depends on whether range_l contains range_r or not. # # This has the effect of making the intersection prefer an inclusive right boundary to an # equivalent exclusive one (e.g. [0,2] is preferred over [0,3)) if range_l.end is not None and range_r._is_inside_right_bound(range_l.end): end: Optional[T] = range_l.end right_exclusive = range_l._is_right_exclusive else: end = range_r.end right_exclusive = range_r._is_right_exclusive boundaries = RangeBoundaries.from_bounds( range_r._is_left_exclusive, right_exclusive ) return Range(range_r.start, end, boundaries=boundaries)
[docs] def union(self, other: "Range[T]") -> Optional["Range[T]"]: """ If two ranges overlap (or are adjacent), return an range covering the two ranges. If the two ranges are disjoint, return None. """ range_l, range_r = (self, other) if self < other else (other, self) if range_l.is_disjoint(range_r) and not ( range_l.end == range_r.start and (range_l._is_right_inclusive or range_r._is_left_inclusive) ): return None # Since we have sorted the two ranges, the left will always be determined by range_l, but # the right depends on whether range_l contains range_r or not. # # This has the effect of making the resulting union prefer an exclusive right boundary to # the equivalent inclusive one (e.g. [0,3) is preferred over [0,2]). if range_r.end is not None and range_l._is_inside_right_bound(range_r.end): end = range_l.end right_exclusive = range_l._is_right_exclusive else: end = range_r.end right_exclusive = range_r._is_right_exclusive boundaries = RangeBoundaries.from_bounds( range_l._is_left_exclusive, right_exclusive ) return Range(range_l.start, end, boundaries=boundaries)
[docs] def difference( self, other: "Range[T]" ) -> Optional[Union["Range[T]", "RangeSet[T]"]]: """ Return a range or rangeset consisting of the bits of this range that do not intersect the other range (or None if this range is covered by the other range). """ if self.is_disjoint(other): return self if (self.start is None and other.start is not None) or ( self.start is not None and other.start is not None and not other._is_inside_left_bound(self.start) and self._is_inside_left_bound(other.start) ): boundaries = RangeBoundaries.from_bounds( self._is_left_exclusive, other._is_left_inclusive ) lower_part: Optional["Range[T]"] = Range( self.start, other.start, boundaries=boundaries ) else: lower_part = None if (self.end is None and other.end is not None) or ( self.end is not None and other.end is not None and not other._is_inside_right_bound(self.end) and self._is_inside_right_bound(other.end) ): boundaries = RangeBoundaries.from_bounds( other._is_right_inclusive, self._is_right_exclusive ) upper_part: Optional["Range[T]"] = Range( other.end, self.end, boundaries=boundaries ) else: upper_part = None if lower_part is None and upper_part is None: return None elif lower_part is not None and upper_part is not None: return RangeSet([lower_part, upper_part]) else: return lower_part or upper_part
def __and__(self, other: "Range[T]") -> Optional["Range[T]"]: """ Wrapper for intersection to allow the & operator to be used. """ return self.intersection(other) def __or__(self, other: "Range[T]") -> Optional["Range[T]"]: """ Wrapper for union to allow the | operator to be used. """ return self.union(other) def __sub__(self, other: "Range[T]") -> Optional[Union["Range[T]", "RangeSet[T]"]]: """ Wrapper for difference to allow the - operator to be used. """ return self.difference(other)
[docs] def is_left_finite(self) -> bool: return self.start is not None
[docs] def is_right_finite(self) -> bool: return self.end is not None
[docs] def is_finite(self) -> bool: return self.is_left_finite() and self.is_right_finite()
[docs] class FiniteRange(Range[T]): """ A FiniteRange represents a range that MUST have finite endpoints (i.e. they cannot be None). This is mostly to add type checking, as we can specify that functions require a finite range and then skip checking if the endpoints are None. """ start: T end: T
[docs] def intersection(self, other: Range[T]) -> Optional["FiniteRange[T]"]: """ Intersections with finite ranges will always be finite. """ return cast("FiniteRange[T]", super().intersection(other))
def __and__(self, other: Range[T]) -> Optional["FiniteRange[T]"]: return self.intersection(other)
[docs] class HalfFiniteRange(Range[T]): """ This is also for type-checking, but represents a very common range type in Kraken (possibly the most common). Specifically, ranges that MUST have a finite inclusive left endpoint and a (possibly infinite) exclusive right endpoint. However LeftFiniteInclusiveRightExclusiveRange is a bit of a mouthful so we're going with HalfFiniteRange. """ start: T boundaries = RangeBoundaries.INCLUSIVE_EXCLUSIVE
[docs] def intersection(self, other: Range[T]) -> Optional["HalfFiniteRange[T]"]: """ Intersections with half finite ranges will always be half finite. """ return cast("HalfFiniteRange[T]", super().intersection(other))
def __and__(self, other: Range[T]) -> Optional["HalfFiniteRange[T]"]: return self.intersection(other)
# Type aliases for common range types DatetimeRange = Range[datetime.datetime] HalfFiniteDatetimeRange = HalfFiniteRange[datetime.datetime]
[docs] class RangeSet(Generic[T]): """ A RangeSet represents an ordered set of disjoint ranges. It is constructed from an iterable of Ranges (which can be omited to create an empty RangeSet): >>> RangeSet() # empty RangeSet <RangeSet: {}> >>> rs = RangeSet([Range(0, 1), Range(2, 4)]) # Single iterable of ranges >>> print(f"{rs}") "{[0,1), [2, 4)}" Overlapping Ranges are condensed when they are added to a set: >>> RangeSet([Range(0, 3), Range(2, 4)]) == RangeSet([Range(0, 4)]) True """ def __init__(self, source_ranges: Optional[Iterable[Range[T]]] = None): if source_ranges is None: # We need to type this as a Sequence so that it is covariant self._ranges: Sequence[Range[T]] = [] else: self._ranges = self._condense_range_list(source_ranges) def __str__(self) -> str: ranges = ", ".join(str(r) for r in self._ranges) return f"{{{ranges}}}" def __repr__(self) -> str: return f"<RangeSet: {str(self)}>" def __eq__(self, other: Any) -> bool: if not isinstance(other, RangeSet): return False return self._ranges == other._ranges
[docs] def add(self, item: Range[T]) -> None: self._ranges = self._condense_range_list(itertools.chain(self._ranges, [item]))
def _condense_range_list(self, source_ranges: Iterable[Range[T]]) -> List[Range[T]]: """ Given an iterable of source ranges, return a sorted list of ranges where the ranges have been condensed into the smalles amount of disjoint ranges that matches the input. For example: * [[0,4), [1, 5), [7,9)] -> [[0,5), [7,9)] """ # By sorting the source ranges we can check for overlaps by just looking at whether # there is an overlap with the most recent range we added ranges: List[Range[T]] = [] for current_range in sorted(source_ranges): if not ranges: ranges.append(current_range) continue previous_range = ranges.pop() if (union := previous_range | current_range) is not None: ranges.append(union) else: ranges.extend([previous_range, current_range]) return ranges
[docs] def discard(self, item: Range[T]) -> None: """ Discarding a range from a range set is equivalent to "cutting away" all the intersections of that range with the rangeset. """ ranges: List[Range[T]] = [] for r in self._ranges: difference = r - item if difference is None: continue if isinstance(difference, RangeSet): ranges.extend(difference._ranges) else: ranges.append(difference) self._ranges = self._condense_range_list(ranges)
[docs] def pop(self) -> Range[T]: """ Remove and return an arbitrary set element. Raises KeyError if the set is empty. """ try: elem = self._ranges[0] except IndexError: raise KeyError self.discard(elem) return elem
def __contains__(self, item: Range[T] | T) -> bool: """ Note that this doesn't require exact matching - the target item just has to be inside one of the ranges in the set. """ if isinstance(item, Range): return self.contains_range(item) else: return self.contains_item(item)
[docs] def contains_range(self, range: Range[T]) -> bool: """Check a Range is fully contained within another Range within this RanegSet.""" return any((r & range) == range for r in self._ranges)
[docs] def contains_item(self, item: T) -> bool: """Check if an item is contained by any Range within this RangeSet.""" return any(item in range_ for range_ in self)
[docs] def is_disjoint(self, other: "RangeSet[T]") -> bool: """ Check whether this RangeSet is disjoint from the other one. """ return all(a.is_disjoint(b) for a in self._ranges for b in other._ranges)
def __iter__(self) -> Iterator[Range[T]]: yield from self._ranges def __len__(self) -> int: """ This isn't particularly useful for consumers, but makes equality work nicely. """ return len(self._ranges)
[docs] def intersection(self, other: Range[T] | RangeSet[T]) -> "RangeSet[T]": """ Return the intersection of this RangeSet and the other Range or RangeSet. """ if isinstance(other, Range): other_ranges: Iterable[Range[T]] = [other] else: other_ranges = other return RangeSet( intersection for r, s in itertools.product(self._ranges, other_ranges) if (intersection := r & s) is not None )
[docs] def union(self, other: "RangeSet[T]") -> "RangeSet[T]": return RangeSet(itertools.chain(self._ranges, other._ranges))
def __and__(self, other: "RangeSet[T]") -> "RangeSet[T]": """ Wrapper for intersection to allow the & operator to be used. """ return self.intersection(other) def __or__(self, other: "RangeSet[T]") -> "RangeSet[T]": """ Wrapper for union to allow the | operator to be used. """ return self.union(other)
[docs] def is_left_finite(self) -> bool: return all(r.is_left_finite() for r in self._ranges)
[docs] def is_right_finite(self) -> bool: return all(r.is_right_finite() for r in self._ranges)
[docs] def is_finite(self) -> bool: return all(r.is_finite() for r in self._ranges)
[docs] def complement(self) -> RangeSet[T]: """ Get a rangeset representing the ranges between the ranges in this rangeset and infinite left and right bounds. """ complement: list[Range[T]] = [] if not self: infinite_range: Range[T] = Range( None, None, boundaries=RangeBoundaries.EXCLUSIVE_EXCLUSIVE ) return RangeSet([infinite_range]) if (first_range := self._ranges[0]).is_left_finite(): complement.append( Range( None, first_range.start, boundaries=RangeBoundaries.from_bounds( left_exclusive=True, right_exclusive=(not first_range._is_left_exclusive), ), ) ) for preceeding_range, current_range in zip(self._ranges[:-1], self._ranges[1:]): complement.append( Range( preceeding_range.end, current_range.start, boundaries=RangeBoundaries.from_bounds( left_exclusive=(not preceeding_range._is_right_exclusive), right_exclusive=(not current_range._is_left_exclusive), ), ) ) if (last_range := self._ranges[-1]).is_right_finite(): complement.append( Range( last_range.end, None, boundaries=RangeBoundaries.from_bounds( left_exclusive=(not last_range._is_right_exclusive), right_exclusive=True, ), ) ) return RangeSet(complement)
def __neg__(self) -> RangeSet[T]: return self.complement() def __bool__(self) -> bool: return bool(len(self))
[docs] def difference(self, other: RangeSet[T]) -> RangeSet[T]: """ Return a rangeset consisting of the bits of this rangeset that do not intersect the other rangeset. """ return self & (-other)
def __sub__(self, other: RangeSet[T]) -> RangeSet[T]: return self.difference(other)
[docs] class HalfFiniteRangeSet(RangeSet[T], Generic[T]): """ This subclass is useful when dealing with half-finite intervals as we can offer stronger guarantees than we can get with normal RangeSets - mostly around intersections being half-finite etc. """ _ranges: Sequence[HalfFiniteRange[T]] def __iter__(self) -> Iterator[HalfFiniteRange[T]]: yield from self._ranges
[docs] def intersection(self, other: Range[T] | RangeSet[T]) -> "HalfFiniteRangeSet[T]": return cast("HalfFiniteRangeSet[T]", super().intersection(other))
[docs] def pop(self) -> HalfFiniteRange[T]: return cast(HalfFiniteRange[T], super().pop())
def __and__(self, other: RangeSet[T]) -> "HalfFiniteRangeSet[T]": return self.intersection(other)
[docs] class FiniteRangeSet(RangeSet[T]): """ This subclass is useful when dealing with finite intervals as we can offer stronger guarantees than we can get with normal RangeSets - mostly around intersections being finite etc. """ _ranges: Sequence[FiniteRange[T]] def __iter__(self) -> Iterator[FiniteRange[T]]: yield from self._ranges
[docs] def intersection(self, other: Range[T] | RangeSet[T]) -> "FiniteRangeSet[T]": return cast("FiniteRangeSet[T]", super().intersection(other))
[docs] def pop(self) -> FiniteRange[T]: """ Remove and return an arbitrary set element. Raises KeyError if the set is empty. """ return cast(FiniteRange[T], super().pop())
def __and__(self, other: RangeSet[T]) -> "FiniteRangeSet[T]": return self.intersection(other)
[docs] class FiniteDatetimeRange(FiniteRange[datetime.datetime]): """ This subclass is a helper for a common usecase for ranges - representing finite intervals of time. """ def __init__(self, start: datetime.datetime, end: datetime.datetime): """ Force the boundaries of the range to be [). """ super().__init__(start, end, boundaries=RangeBoundaries.INCLUSIVE_EXCLUSIVE)
[docs] def intersection( self, other: Range[datetime.datetime] ) -> Optional["FiniteDatetimeRange"]: """ Intersections with finite ranges will always be finite. """ base_intersection = super().intersection(other) if base_intersection is None: return None assert base_intersection.boundaries == RangeBoundaries.INCLUSIVE_EXCLUSIVE return FiniteDatetimeRange(base_intersection.start, base_intersection.end)
def __and__( self, other: Range[datetime.datetime] ) -> Optional["FiniteDatetimeRange"]: return self.intersection(other) @property def days(self) -> int: """ Return the number of days between the start and end of the range. """ return (self.end - self.start).days @property def seconds(self) -> int: """ Return the number of seconds between the start and end of the range. """ return int((self.end - self.start).total_seconds())
[docs] class FiniteDateRange(FiniteRange[datetime.date]): """ This subclass is a helper for a common usecase for ranges - representing finite intervals of whole days. """ def __init__(self, start: datetime.date, end: datetime.date): """ Force the boundaries of the range to be []. """ super().__init__(start, end, boundaries=RangeBoundaries.INCLUSIVE_INCLUSIVE)
[docs] def intersection(self, other: Range[datetime.date]) -> Optional["FiniteDateRange"]: """ Intersections with finite ranges will always be finite. """ try: base_intersection = super().intersection(other) except ValueError: # This occurs when calling intersection on an adjacent date range. return None if base_intersection is None: return None assert base_intersection.boundaries == RangeBoundaries.INCLUSIVE_INCLUSIVE return FiniteDateRange(base_intersection.start, base_intersection.end)
[docs] def union(self, other: Range[datetime.date]) -> Optional["FiniteDateRange"]: """ Unions between two FiniteDateRanges should produce a FiniteDateRange. """ try: base_union = super().union(other) except ValueError: return None if base_union is None: return None assert base_union.boundaries == RangeBoundaries.INCLUSIVE_INCLUSIVE assert base_union.start is not None assert base_union.end is not None return FiniteDateRange(base_union.start, base_union.end)
[docs] def is_disjoint(self, other: Range[datetime.date]) -> bool: # Adjacent dates should not be considered disjoint, we extend the other # range to allow them to be considered adjacent. other_start = other.start if other._is_left_inclusive: assert other_start is not None try: other_start -= datetime.timedelta(days=1) except OverflowError: pass other_end = other.end if other._is_right_inclusive: assert other_end is not None try: other_end += datetime.timedelta(days=1) except OverflowError: pass return super().is_disjoint( Range(start=other_start, end=other_end, boundaries=other.boundaries) )
def __and__(self, other: Range[datetime.date]) -> Optional["FiniteDateRange"]: return self.intersection(other) @property def days(self) -> int: """ Return the number of days between the start and end of the range. """ return (self.end - self.start).days + 1
[docs] def get_finite_datetime_ranges_from_timestamps( finite_datetime_range: FiniteRange[datetime.datetime], timestamps: Iterable[datetime.datetime], ) -> Sequence[FiniteDatetimeRange]: """ Given a datetime range and some timestamps, cut that period into multiple points whenever one of the timestamps falls within it. Sorts and deduplicates the timestamps first. Example: - Input: - period: ("2021-09-01 00:00:00", "2021-10-01 00:00:00") - timestamps: ["2021-09-10 00:00:00", "2021-09-16 00:00:00", "2021-09-23 00:00:00"] - Return: [ ("2021-09-01 00:00:00", "2021-09-10 00:00:00"), ("2021-09-10 00:00:00", "2021-09-16 00:00:00"), ("2021-09-16 00:00:00", "2021-09-23 00:00:00"), ("2021-09-23 00:00:00", "2021-10-01 00:00:00"), ] """ timestamps_in_range = sorted( {timestamp for timestamp in timestamps if timestamp in finite_datetime_range} ) return [ FiniteDatetimeRange(start, end) for start, end in zip( [finite_datetime_range.start, *timestamps_in_range], [*timestamps_in_range, finite_datetime_range.end], ) # We don't need to split when a timestamp is exactly at the start # or end of the period. if start != end ]
[docs] def any_overlapping(ranges: Iterable[Range[T]]) -> bool: """Return true if any of the passed Ranges are overlapping.""" ranges = list(ranges) if not ranges: return False range_set = RangeSet[T]([ranges[0]]) for range in ranges[1:]: if range_set.intersection(range): return True range_set.add(range) return False
[docs] def as_finite_datetime_periods( periods: Iterable[HalfFiniteDatetimeRange | DatetimeRange], ) -> Sequence[FiniteDatetimeRange]: """ Casts the given date/time periods as finite periods. This is useful when working with potentially infinite ranges that are known to be finite e.g. due to intersection with a finite range. Raises: ValueError: If one or more periods is not finite. """ finite_periods = [] for period in periods: if period.start is None or period.end is None: raise ValueError("Period is not finite at start or end or both") finite_periods += [FiniteDatetimeRange(period.start, period.end)] return finite_periods