from ast import literal_eval
from pathlib import Path
from pprint import pprint
from sys import version_info
from typing import Type, Union
from ccflow import CallableModel, ContextType, Flow, ResultType
from lerna import compose, initialize_config_dir
from lerna.utils import instantiate
from nbformat import NotebookNode, read as nb_read
from nbformat.v4 import new_notebook
from pkn import getSimpleLogger
from pydantic import Field, PrivateAttr, field_validator, model_validator
from typing_extensions import Self
from nbprint import __version__
from nbprint.config.base import BaseModel, Role, _append_or_extend
from nbprint.config.block_runtime import NBPRINT_BLOCK_MIME
from nbprint.config.cell import NBPRINT_MIME
from nbprint.config.common import Style
from nbprint.config.content import Content, ContentCode, ContentMarkdown
from nbprint.config.exceptions import NBPrintPathIsYamlError, NBPrintPathOrModelMalformedError
from nbprint.config.magic import _parse_magic_line
from nbprint.config.overlay import LayoutOverlay, Overlay, apply_layout_overlays, apply_overlays
from nbprint.config.page_runtime import NBPRINT_PAGE_MIME
from .content import SECTION_ORDER, ContentMarshall
from .context import Context
from .outputs import Outputs, OutputsProcessing
from .page import Page, PageGlobal
from .parameters import PapermillParameters, Parameters
__all__ = (
"Configuration",
"load",
)
_log = getSimpleLogger("nbprint.config.core.config")
_SECTION_TAG_PREFIX = "nbprint:section:"
[docs]
class Configuration(CallableModel, BaseModel):
name: str
resources: dict[str, BaseModel] = Field(default_factory=dict)
outputs: Outputs
parameters: Parameters = Field(default_factory=Parameters)
page: Page = Field(default_factory=PageGlobal)
context: Context = Field(default_factory=Context)
content: ContentMarshall = Field(default_factory=ContentMarshall)
# Formatting overlays — applied during notebook ingestion only.
overlays: list[Overlay] = Field(
default_factory=list,
description="Formatting overlays merged into ingested notebook cells.",
)
layout_overlays: list[LayoutOverlay] = Field(
default_factory=list,
description="Layout overlays that wrap contiguous ranges of ingested cells in a flex container.",
)
# basic metadata
tags: list[str] = Field(default_factory=list)
role: Role = Role.CONFIGURATION
ignore: bool = True
pagedjs: bool = True
debug: bool = True
# internals
_multi: bool = PrivateAttr(default=False)
_nb_var_name: str = PrivateAttr(default="nbprint_config")
_nb_vars: set = PrivateAttr(default_factory=set)
@field_validator("tags", mode="after")
@classmethod
def _ensure_tags(cls, v: list[str]) -> list[str]:
if "nbprint:config" not in v:
v.append("nbprint:config")
return v
@field_validator("resources", mode="before")
@classmethod
def _convert_resources_from_obj(cls, value) -> dict[str, BaseModel]:
if value is None:
value = {}
if isinstance(value, dict):
for k, v in value.items():
value[k] = BaseModel._to_type(v)
return value
@field_validator("outputs", mode="before")
@classmethod
def _convert_outputs_from_obj(cls, v) -> Outputs:
return BaseModel._to_type(v, Outputs)
@field_validator("parameters", mode="before")
@classmethod
def _convert_parameters_from_obj(cls, v) -> Parameters:
return BaseModel._to_type(v, Parameters)
@field_validator("page", mode="before")
@classmethod
def _convert_page_from_obj(cls, v) -> PageGlobal:
return BaseModel._to_type(v, PageGlobal)
@field_validator("context", mode="before")
@classmethod
def _convert_context_from_obj(cls, v) -> Context:
return BaseModel._to_type(v, Context)
@staticmethod
def _convert_content_from_list(v) -> ContentMarshall:
for i, element in enumerate(v):
if isinstance(element, str):
v[i] = Content(type_=element)
elif isinstance(element, dict):
v[i] = BaseModel._to_type(element)
return ContentMarshall(middlematter=v)
@staticmethod
def _convert_content_from_dict(v) -> ContentMarshall:
for key in ContentMarshall.model_fields:
if key in v and isinstance(v[key], list):
v[key] = Configuration._convert_content_from_list(v[key]).all
return ContentMarshall(**v)
[docs]
@field_validator("content", mode="before")
@classmethod
def convert_content_from_obj(cls, v) -> ContentMarshall:
if v is None:
return ContentMarshall()
if isinstance(v, list):
return cls._convert_content_from_list(v)
if isinstance(v, dict):
return cls._convert_content_from_dict(v)
return v
@model_validator(mode="after")
def _validate(self) -> Self:
self.context.parameters = self.parameters
self.outputs._compute_outputs(config=self)
return self
# NOTE: this shouldve been possible via a wrap validator,
# but alas i could not get it to work
def __setattr__(self, name: str, value) -> None:
if name == "parameters" and self.parameters:
value = BaseModel._to_type(value, Parameters) if isinstance(value, dict) and "type_" in value else PapermillParameters.model_validate(value)
if type(value) is not type(self.parameters):
# replace wholesale
super().__setattr__(name, value)
else:
# Union new parameters with existing parameters
for k, v in value.model_dump(mode="json", exclude_unset=True, exclude={"type_"}).items() if isinstance(value, Parameters) else value.items():
setattr(self.parameters, k, v)
self._validate()
return None
return super().__setattr__(name, value)
@staticmethod
def _init_content(values) -> ContentMarshall:
if "content" not in values:
values["content"] = ContentMarshall()
elif isinstance(values["content"], list):
values["content"] = ContentMarshall(middlematter=values["content"])
elif isinstance(values["content"], dict):
values["content"] = ContentMarshall(**values["content"])
else:
e = f"Unexpected content format when loading from notebook: {type(values['content'])}"
raise RuntimeError(e)
@staticmethod
def _cell_to_content(cell) -> Content:
source = cell.source.strip()
if not source:
# skip empty cells
return None
# Cells may have nbprint metadata from the UI extension
# "nbprint": {
# "attrs": "",
# "class": "",
# "class_selector": "",
# "css": "",
# "data": "{}",
# "element_selector": "",
# "esm": "",
# "id": "",
# "ignore": true,
# "role": "parameters",
# "type_": "nbprint.config.core.parameters.Parameters"
# },
if "metadata" in cell and "nbprint" in cell["metadata"]:
nbprint_cell_meta = dict(cell["metadata"]["nbprint"])
# These fields are generated during output and are not
# part of the input model — drop them to avoid validation errors.
# `class` is re-derived from `classname` during generation.
nbprint_cell_meta.pop("class", None)
nbprint_cell_meta.pop("class_selector", None)
nbprint_cell_meta.pop("element_selector", None)
nbprint_cell_meta.pop("data", None)
nbprint_cell_meta.pop("parent-id", None)
# Preserve `attrs` if it's a dict (the input format); drop if
# it was serialized as a string during generation.
attrs_val = nbprint_cell_meta.get("attrs")
if isinstance(attrs_val, str):
nbprint_cell_meta.pop("attrs", None)
else:
nbprint_cell_meta = {}
# ── Runtime metadata from NBPrintCell (MIME output) ──────────
runtime_meta = Configuration._extract_nbprint_mime(cell)
if runtime_meta:
for key, value in runtime_meta.items():
# Runtime metadata augments but does not overwrite
# explicit cell.metadata.nbprint values.
nbprint_cell_meta.setdefault(key, value)
# ── Runtime metadata from %%nbprint magic ────────────────────
magic_meta = Configuration._extract_nbprint_magic(source)
if magic_meta:
for key, value in magic_meta.items():
nbprint_cell_meta.setdefault(key, value)
# Strip the magic line from the source so the Content
# model stores the real code, not the directive.
source = "\n".join(source.splitlines()[1:])
# Attach cell tags in to content
if "tags" in cell["metadata"]:
nbprint_cell_meta["tags"] = cell["metadata"]["tags"]
# Ensure source is captured as the content payload. Models that
# already declare ``content`` in metadata (e.g. YAML-authored
# Content) are left untouched. For runtime-typed cells (e.g.
# NBPrintPage emits ``type_=nbprint.ContentPageBox``) the source
# becomes the cell body that re-runs on regeneration.
nbprint_cell_meta.setdefault("content", source)
# If this is an nbprint defined type, use that
if "type_" in nbprint_cell_meta:
content_type = nbprint_cell_meta["type_"]
content_model = BaseModel._to_type({"type_": content_type}, Content)
return content_model.model_validate(nbprint_cell_meta)
# Default handling: treat as code or markdown content
if cell.cell_type == "code":
content = ContentCode.model_validate(nbprint_cell_meta)
elif cell.cell_type == "markdown":
content = ContentMarkdown.model_validate(nbprint_cell_meta)
else:
# Skip, log warning
_log.warning(f"Unsupported cell type when loading from notebook: {cell.cell_type}")
return content
@staticmethod
def _extract_nbprint_mime(cell) -> dict | None:
"""Extract nbprint metadata from a cell's outputs (MIME type output).
Looks for outputs containing any of the nbprint runtime MIME
types (``application/nbprint.cell+json``,
``application/nbprint.page+json``,
``application/nbprint.block+json``) and returns the parsed
metadata dict, or ``None``. All MIME types use the same merge
semantics into the cell's nbprint metadata; the kind of model
constructed is later driven by the embedded ``type_`` field.
"""
import json as _json
outputs = cell.get("outputs", [])
for output in outputs:
data = output.get("data", {})
for mime in (NBPRINT_MIME, NBPRINT_PAGE_MIME, NBPRINT_BLOCK_MIME):
if mime in data:
raw = data[mime]
if isinstance(raw, str):
return _json.loads(raw)
if isinstance(raw, dict):
return raw
return None
@staticmethod
def _extract_nbprint_magic(source: str) -> dict | None:
"""Parse ``%%nbprint key=value ...`` from the first line of source.
Returns the parsed kwargs dict, or ``None`` if the cell does not
start with the magic.
"""
first_line = source.split("\n", 1)[0].strip()
if not first_line.startswith("%%nbprint"):
return None
# Strip the ``%%nbprint`` prefix and parse the rest
line = first_line[len("%%nbprint") :].strip()
if not line:
return {}
return _parse_magic_line(line)
@staticmethod
def _parse_parameters_cell(cell) -> dict:
new_parameters = {}
param_lines = cell.source.splitlines()
for line in param_lines:
if "=" in line:
key, value = line.split("=", 1)
key = key.strip()
value = value.strip()
# Attempt to eval the value to get correct type
try:
evaluated_value = literal_eval(value)
except SyntaxError:
evaluated_value = value
new_parameters[key] = evaluated_value
return new_parameters
@staticmethod
def _extract_section_for_cell(cell) -> str | None:
"""Determine the target section for a cell from tags or nbprint metadata.
Checks (in priority order):
1. ``cell.metadata.nbprint.section`` — explicit section name.
2. Cell tags matching ``nbprint:section:<name>`` — tag-based routing.
Returns the section name or ``None`` (meaning default to ``middlematter``).
"""
# 1. Check nbprint metadata
nbprint_meta = cell.get("metadata", {}).get("nbprint", {})
section = nbprint_meta.get("section")
if section and section in SECTION_ORDER:
return section
# 2. Check cell tags
tags = cell.get("metadata", {}).get("tags", [])
for tag in tags:
if tag.startswith(_SECTION_TAG_PREFIX):
candidate = tag[len(_SECTION_TAG_PREFIX) :]
if candidate in SECTION_ORDER:
return candidate
return None
@staticmethod
def _process_cells(values, nb_content: NotebookNode) -> None:
new_parameters = {}
cells_to_process = nb_content.cells
if cells_to_process and "metadata" in cells_to_process[0] and "parameters" in cells_to_process[0]["metadata"].get("tags", []):
# Parse first cell for parameters
first_cell = cells_to_process[0]
# skip first cell
cells_to_process = cells_to_process[1:]
# Pull out the parameters object and ensure everything is present
if "parameters" not in values:
values["parameters"] = PapermillParameters()
# Pull out the parameters object and ensure everything is present
new_parameters = Configuration._parse_parameters_cell(first_cell)
else:
cells_to_process = nb_content.cells
# Resolve overlays — accept either pre-built Overlay instances or
# dict-shaped specs from YAML/notebook metadata.
raw_overlays = values.get("overlays") or []
overlays: list[Overlay] = []
for spec in raw_overlays:
if isinstance(spec, Overlay):
overlays.append(spec)
elif isinstance(spec, dict):
overlays.append(Overlay.model_validate(spec))
raw_layouts = values.get("layout_overlays") or []
layout_overlays: list[LayoutOverlay] = []
for spec in raw_layouts:
if isinstance(spec, LayoutOverlay):
layout_overlays.append(spec)
elif isinstance(spec, dict):
layout_overlays.append(LayoutOverlay.model_validate(spec))
placements: list = [None] * len(cells_to_process)
for i, cell in enumerate(cells_to_process):
cell_instance = Configuration._cell_to_content(cell)
if cell_instance is not None:
# Route cell to the appropriate section based on tags/metadata.
# Check raw cell metadata/tags first (highest priority).
section = Configuration._extract_section_for_cell(cell)
# Fall back to runtime metadata: MIME output, then %%nbprint magic.
if section is None:
runtime_meta = Configuration._extract_nbprint_mime(cell) or {}
magic_meta = Configuration._extract_nbprint_magic(cell.source.strip()) or {}
runtime_section = runtime_meta.get("section") or magic_meta.get("section")
if runtime_section and runtime_section in SECTION_ORDER:
section = runtime_section
target_section = section or "middlematter"
# Apply formatting overlays
apply_overlays(overlays, cell=cell, content=cell_instance, index=i, section=target_section)
getattr(values["content"], target_section).append(cell_instance)
placements[i] = (target_section, cell_instance)
# Apply layout-wrapping overlays after all cells are placed.
if layout_overlays:
apply_layout_overlays(layout_overlays, cells_to_process, placements, values["content"])
for k, v in new_parameters.items():
if k in values["parameters"].model_fields and getattr(values["parameters"], k) is None:
setattr(values["parameters"], k, v)
elif isinstance(values["parameters"], PapermillParameters) and (k not in values["parameters"].vars or values["parameters"].vars[k] is None):
values["parameters"].vars[k] = v
@model_validator(mode="before")
@classmethod
def _append_notebook_content(cls, values) -> None:
if values.get("notebook") is None:
return values
cls._init_content(values)
file = Path(values.pop("notebook"))
with file.open("r", encoding="utf-8") as path_file:
nb_content = nb_read(path_file, as_version=4)
# Extract notebook-level nbprint metadata for page and output config.
# These are low-priority defaults: explicit values in ``values`` (from
# YAML or CLI overrides) take precedence.
nb_meta = getattr(nb_content.metadata, "nbprint", None) or (nb_content.metadata.get("nbprint") if isinstance(nb_content.metadata, dict) else None)
if nb_meta:
nb_nbprint = nb_meta if isinstance(nb_meta, dict) else dict(nb_meta)
# Page config from notebook metadata
if "page" in nb_nbprint and "page" not in values:
values["page"] = nb_nbprint["page"]
# Outputs config from notebook metadata
if "outputs" in nb_nbprint and "outputs" not in values:
values["outputs"] = nb_nbprint["outputs"]
# Overlays from notebook metadata (appended to any explicit
# overlays so YAML-defined overlays still apply).
if "overlays" in nb_nbprint:
nb_overlays = list(nb_nbprint["overlays"])
existing = values.get("overlays") or []
values["overlays"] = [*existing, *nb_overlays]
if "layout_overlays" in nb_nbprint:
nb_layouts = list(nb_nbprint["layout_overlays"])
existing = values.get("layout_overlays") or []
values["layout_overlays"] = [*existing, *nb_layouts]
cls._process_cells(values, nb_content)
return values
[docs]
def generate(self, **_) -> list[NotebookNode]:
nb = new_notebook()
nb.metadata.nbprint = {}
nb.metadata.nbprint.version = __version__
nb.metadata.nbprint.tags = []
nb.metadata.nbprint.nbprint = {}
nb.metadata.nbprint.language = f"python{version_info.major}.{version_info.minor}"
base_meta = {
"tags": [],
"nbprint": {},
}
nb.cells = []
# start with parameters for papermill compat
# use `parent=None` because we parameters is first cell, and we wont instantiate the config
# until the next cell
_append_or_extend(nb.cells, self.parameters.generate(metadata=base_meta.copy(), config=self, parent=None))
# now do the configuration itself
_append_or_extend(nb.cells, self._generate_self(metadata=base_meta.copy()))
# now do the context object
# pass in parent=self, attr=context so we do config.context
_append_or_extend(nb.cells, self.context.generate(metadata=base_meta.copy(), config=self, parent=self, attr="context"))
# resources: dict[str, SerializeAsAny[BaseModel]] = Field(default_factory=dict)
# TODO: omitting resources, referenced directly in yaml
# cell.metadata.nbprint.resources = {k: v.model_dump_json(by_alias=True) for k, v in self.resources.items()}
# outputs: SerializeAsAny[Outputs]
# TODO: skipping, consumed internally
# now setup the page layout
# pass in parent=self, attr=page so we do config.page
_append_or_extend(nb.cells, self.page.generate(metadata=base_meta.copy(), config=self, parent=self, attr="page"))
# now iterate through the content, section by section
content_counter = 0
for section_name, group_name, section_contents in self.content.sections():
section_default_style = self.content.section_styles.get(section_name)
for content in section_contents:
# Apply section-level default style: section default is
# merged with the content's own style; content style fields
# override the section default.
if section_default_style is not None and isinstance(content, Content):
if isinstance(content.style, Style):
content.style = section_default_style.merge(content.style)
elif content.style is None:
content.style = section_default_style
cells = content.generate(metadata=base_meta.copy(), config=self, parent=self, attr="content", counter=content_counter)
# Tag generated cells with section metadata
tagged = cells if isinstance(cells, list) else [cells] if cells is not None else []
for cell in tagged:
section_tag = f"nbprint:section:{section_name}"
group_tag = f"nbprint:section-group:{group_name}"
if section_tag not in cell.metadata.tags:
cell.metadata.tags.append(section_tag)
if group_tag not in cell.metadata.tags:
cell.metadata.tags.append(group_tag)
_append_or_extend(nb.cells, cells)
content_counter += 1
# Finally, run the outputs cell
# NOTE: outputs cell doesnt usually actually do anything, unless
# it is set to run in-context, in which case it will only
# execute inside the notebook and note outside
_append_or_extend(nb.cells, self.outputs.generate(metadata=base_meta.copy(), config=self, parent=self, attr="outputs"))
return nb
def _generate_self(self, metadata: dict) -> NotebookNode:
cell = super()._base_generate(metadata=metadata, config=self)
# omit the data
cell.metadata.nbprint.data = ""
# add extras
cell.metadata.nbprint.name = self.name
cell.metadata.nbprint.debug = self.debug
cell.metadata.nbprint.pagedjs = self.pagedjs
# add core elements
# NOTE: double-json to ensure pydantic types are properly serialized
cell.metadata.nbprint.outputs = self.outputs.model_dump_json(by_alias=True)
cell.metadata.nbprint.parameters = self.parameters.model_dump_json(by_alias=True)
cell.metadata.nbprint.page = self.page.model_dump_json(by_alias=True)
# Omit context to reduce size
# Omit content, will already be present in the notebook
return cell
def _generate_resources_cells(self, metadata: dict | None = None) -> NotebookNode:
cell = super()._base_generate(metadata=metadata, config=None)
# omit the data
cell.metadata.nbprint.data = ""
# add resources
# mod = ast.Module(body=[], type_ignores=[])
# for k, v in self.resources.items():
# ...
return cell
[docs]
@staticmethod
def load(path_or_model: Union[str, Path, dict, "Configuration"], name: str) -> "Configuration":
if isinstance(path_or_model, Configuration):
return path_or_model
if isinstance(path_or_model, str) and path_or_model.endswith(".yml"):
raise NBPrintPathIsYamlError(path_or_model)
if isinstance(path_or_model, str) and path_or_model.endswith(".yaml"):
path_or_model = Path(path_or_model).resolve()
if isinstance(path_or_model, Path):
path_or_model = path_or_model.resolve()
folder = str(path_or_model.parent)
file = str(path_or_model.name)
with initialize_config_dir(version_base=None, config_dir=folder, job_name=name):
cfg = compose(config_name=file, overrides=[f"+name={name}"])
config = instantiate(cfg, _convert_="all")
if not isinstance(config, Configuration):
config = Configuration.model_validate(config)
return config
raise NBPrintPathOrModelMalformedError(path_or_model)
[docs]
def run(self, dry_run: bool = False, *, _multi: bool = False) -> Path | None:
gen = self.generate()
ret = None
if self.debug:
pprint(gen)
if not dry_run:
ret = self.outputs.run(self, gen)
if ret in (None, OutputsProcessing.STOP):
# Either a handled problem or user requested stop
# Return None to indicate a problem
# TODO: revisit
return None
if not dry_run and not self._multi and self.outputs.postprocess:
# Run postprocessing
self.outputs.postprocess.object([self])
# NOTE: as of this point, we're "done"
# reset in case we want to run again
self._reset()
return ret
def _reset(self) -> None:
# reset ourselves in case we need to rerun
self._nb_vars = set()
self.context._context_generated = False
# ccflow integration
@property
def context_type(self) -> Type[ContextType]:
return self.parameters.__class__
@property
def result_type(self) -> Type[ResultType]:
return self.outputs.__class__
@Flow.call
def __call__(self, context): # noqa: ANN204
# NOTE: make a copy to avoid mutation during flow runs interfering with caching
# update parameters if changed
if context != self.parameters:
self.parameters = context
self.run()
return self.outputs
load = Configuration.load