s6.app._contextgeneratorsΒΆ

class s6.app._contextgenerators.ContextGenerator(dataset_dir: str | None, log_dir: str | None, pipeline: BasePipeline | None, manually_output_mode: bool = False, run_pipeline: bool = True, **keyargs)

Bases: ABC

An abstract base class for generating frames of context in a streaming fashion.

Subclasses should implement get_frame() to specify how new frames are retrieved.

MAX_HISTORY_LENGTH = 10
classmethod command_handler(command)

Decorator to mark a method as a handler for a command.

abstractmethod get_frame() Iterator[Dict[str, Any]]

Generator method that yields individual frames as dictionaries.

property pipeline_config
property dataset: StructuredDataset | None
property calibration_file: str | None
run(queue: Queue | None = None, command_queue: Queue | None = None, shutdown_event: Event | None = None)
class s6.app._contextgenerators.DatasetContextGenerator(input_dataset_dir: str, repeat: bool = False, realtime_playback: bool = False, **kwargs)

Bases: ContextGenerator

Reads frames from an existing dataset on disk.

get_frame() Iterator[Dict[str, Any]]

Generator method that yields individual frames as dictionaries.

class s6.app._contextgenerators.BaseGSTContextGenerator(dataset_dir: str | None, log_dir: str | None, pipeline: BasePipeline | None, manually_output_mode: bool = False, run_pipeline: bool = True, **keyargs)

Bases: ContextGenerator

Shared helpers for GStreamer-backed context generators.

class s6.app._contextgenerators.RemoteGSTContextGenerator(dataset_dir: str | None, log_dir: str | None, pipeline: BasePipeline | None, manually_output_mode: bool = False, run_pipeline: bool = True, **keyargs)

Bases: BaseGSTContextGenerator

Retrieve frames from remote gstreamer camera sources using external config.

get_frame() Iterator[Dict[str, Any]]

Generator method that yields individual frames as dictionaries.

class s6.app._contextgenerators.LocalGSTContextGenerator(dataset_dir: str | None, log_dir: str | None, pipeline: BasePipeline | None, manually_output_mode: bool = False, run_pipeline: bool = True, **keyargs)

Bases: BaseGSTContextGenerator

Retrieve frames from local GStreamer camera sources using config.

get_frame() Iterator[Dict[str, Any]]

Generator method that yields individual frames as dictionaries.