s6.app._contextgeneratorsΒΆ

class s6.app._contextgenerators.FramesPerSecondColumn(table_column: Column | None = None)

Bases: ProgressColumn

Render progress speed using frames/s instead of Rich’s default it/s.

render(task) Text

Should return a renderable object.

class s6.app._contextgenerators.ContextGenerator(dataset_dir: str | None, log_dir: str | None, pipeline: BasePipeline | None, run_pipeline: bool = True, **keyargs)

Bases: ABC

An abstract base class for generating frames of context in a streaming fashion.

Subclasses should implement get_frame() to specify how new frames are retrieved.

MAX_HISTORY_LENGTH = 10
classmethod command_handler(command)

Decorator to mark a method as a handler for a command.

abstractmethod get_frame() Iterator[Dict[str, Any]]

Generator method that yields individual frames as dictionaries.

property pipeline_config
property dataset: StructuredDataset | None
property calibration_file: str | None
run(queue: Queue | None = None, command_queue: Queue | None = None, shutdown_event: Event | None = None, queue_payload_mode: str = 'context')
class s6.app._contextgenerators.DatasetContextGenerator(input_dataset_dir: str, repeat: bool = False, realtime_playback: bool = False, **kwargs)

Bases: ContextGenerator

Reads frames from an existing dataset on disk.

property frame_count: int

Number of frames in the active input dataset.

property current_index: int

Current replay index maintained by the dataset generator.

clamp_index(frame_index: int) int

Clamp a frame index to the active dataset bounds.

change_dataset(dataset_dir: str) None

Switch the input dataset and reset replay bookkeeping.

read_frame(frame_index: int) Dict[str, Any]

Return one prepared replay frame at frame_index.

get_frame() Iterator[Dict[str, Any]]

Generator method that yields individual frames as dictionaries.

class s6.app._contextgenerators.BaseGSTContextGenerator(dataset_dir: str | None, log_dir: str | None, pipeline: BasePipeline | None, run_pipeline: bool = True, **keyargs)

Bases: ContextGenerator

Shared helpers for GStreamer-backed context generators.

class s6.app._contextgenerators.RemoteGSTContextGenerator(dataset_dir: str | None, log_dir: str | None, pipeline: BasePipeline | None, run_pipeline: bool = True, **keyargs)

Bases: BaseGSTContextGenerator

Retrieve frames from remote gstreamer camera sources using external config.

get_frame() Iterator[Dict[str, Any]]

Generator method that yields individual frames as dictionaries.

class s6.app._contextgenerators.LocalGSTContextGenerator(dataset_dir: str | None, log_dir: str | None, pipeline: BasePipeline | None, run_pipeline: bool = True, **keyargs)

Bases: BaseGSTContextGenerator

Retrieve frames from local GStreamer camera sources using config.

get_frame() Iterator[Dict[str, Any]]

Generator method that yields individual frames as dictionaries.

class s6.app._contextgenerators.LocalGSTContextGeneratorV2(dataset_dir: str | None, log_dir: str | None, pipeline: BasePipeline | None, run_pipeline: bool = True, **keyargs)

Bases: BaseGSTContextGenerator

Retrieve frames from one combined local GStreamer stream using direct Gst.

get_frame() Iterator[Dict[str, Any]]

Generator method that yields individual frames as dictionaries.