Skip to main content

Iceberg (dagster-iceberg)

This library provides an integration with the Iceberg table format.

For more information on getting started, see the Dagster & Iceberg documentation.

I/O Managers

dagster_iceberg.io_manager.arrow.PyArrowIcebergIOManager IOManagerDefinition [source]
preview

This API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use.

An I/O manager definition that reads inputs from and writes outputs to Iceberg tables using PyArrow.

Examples:

import pandas as pd
import pyarrow as pa
from dagster import Definitions, asset
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.arrow import PyArrowIcebergIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)

resources = {
"io_manager": PyArrowIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}


@asset
def iris_dataset() -> pa.Table:
pa.Table.from_pandas(
pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
)


defs = Definitions(assets=[iris_dataset], resources=resources)

If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”. The I/O manager will check if the namespace exists in the Iceberg catalog. It does not automatically create the namespace if it does not exist.

@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pa.Table:
...

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pa.Table):
# my_table will just contain the data from column "a"
...
dagster_iceberg.io_manager.daft.DaftIcebergIOManager IOManagerDefinition [source]
preview

This API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use.

An I/O manager definition that reads inputs from and writes outputs to Iceberg tables using Daft.

Examples:

import daft as da
import pandas as pd
from dagster import Definitions, asset
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.daft import DaftIcebergIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)

resources = {
"io_manager": DaftIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}


@asset
def iris_dataset() -> da.DataFrame:
return da.from_pandas(
pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
)


defs = Definitions(assets=[iris_dataset], resources=resources)

If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”. The I/O manager will check if the namespace exists in the Iceberg catalog. It does not automatically create the namespace if it does not exist.

@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> da.DataFrame:
...

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: da.DataFrame):
# my_table will just contain the data from column "a"
...
dagster_iceberg.io_manager.pandas.PandasIcebergIOManager IOManagerDefinition [source]
preview

This API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use.

An I/O manager definition that reads inputs from and writes outputs to Iceberg tables using pandas.

Examples:

import pandas as pd
from dagster import Definitions, asset
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.pandas import PandasIcebergIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)

resources = {
"io_manager": PandasIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}


@asset
def iris_dataset() -> pd.DataFrame:
return pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)


defs = Definitions(assets=[iris_dataset], resources=resources)

If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”. The I/O manager will check if the namespace exists in the Iceberg catalog. It does not automatically create the namespace if it does not exist.

@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
# my_table will just contain the data from column "a"
...
dagster_iceberg.io_manager.polars.PolarsIcebergIOManager IOManagerDefinition [source]
preview

This API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use.

An I/O manager definition that reads inputs from and writes outputs to Iceberg tables using Polars.

Examples:

import pandas as pd
import polars as pl
from dagster import Definitions, asset
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.polars import PolarsIcebergIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)

resources = {
"io_manager": PolarsIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}


@asset
def iris_dataset() -> pl.DataFrame:
return pl.from_pandas(
pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
)


defs = Definitions(assets=[iris_dataset], resources=resources)

If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”. The I/O manager will check if the namespace exists in the Iceberg catalog. It does not automatically create the namespace if it does not exist.

@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pl.DataFrame:
...

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pl.DataFrame):
# my_table will just contain the data from column "a"
...
dagster_iceberg.io_manager.spark.SparkIcebergIOManager IOManagerDefinition [source]
preview

This API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use.

Base class for Dagster IO managers that utilize structured config. This base class is useful for cases in which the returned IO manager is not the same as the class itself (e.g. when it is a wrapper around the actual IO manager implementation).

This class is a subclass of both IOManagerDefinition and Config. Implementers should provide an implementation of the resource_function() method, which should return an instance of IOManager.

Example definition:

class ExternalIOManager(IOManager):

def __init__(self, connection):
self._connection = connection

def handle_output(self, context, obj):
...

def load_input(self, context):
...

class ConfigurableExternalIOManager(ConfigurableIOManagerFactory):
username: str
password: str

def create_io_manager(self, context) -> IOManager:
with database.connect(username, password) as connection:
return MyExternalIOManager(connection)

defs = Definitions(
...,
resources={
"io_manager": ConfigurableExternalIOManager(
username="dagster",
password=EnvVar("DB_PASSWORD")
)
}
)

Resources

dagster_iceberg.resource.IcebergTableResource ResourceDefinition [source]
preview

This API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use.

Resource for interacting with a PyIceberg table.

Example:

from dagster import Definitions, asset
from dagster_iceberg import IcebergTableResource


@asset
def my_table(iceberg_table: IcebergTableResource):
df = iceberg_table.load().to_pandas()


warehouse_path = "/path/to/warehouse"

defs = Definitions(
assets=[my_table],
resources={
"iceberg_table": IcebergTableResource(
name="my_catalog",
config=IcebergCatalogConfig(
properties={
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
}
),
table="my_table",
namespace="my_namespace",
)
},
)

Config

class dagster_iceberg.config.IcebergCatalogConfig [source]
preview

This API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use.

Configuration for Iceberg Catalogs.

See the Catalogs section for configuration options.

You can configure the Iceberg IO manager:

  1. Using a .pyiceberg.yaml configuration file.
  2. Through environment variables.
  3. Using the IcebergCatalogConfig configuration object.

For more information about the first two configuration options, see Setting Configuration Values.

Example:

from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.arrow import PyArrowIcebergIOManager

warehouse_path = "/path/to/warehouse"

io_manager = PyArrowIcebergIOManager(
name="my_catalog",
config=IcebergCatalogConfig(
properties={
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
}
),
namespace="my_namespace",
)

Base Classes

class dagster_iceberg.io_manager.base.IcebergIOManager [source]
preview

This API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use.

Base class for an I/O manager definition that reads inputs from and writes outputs to Iceberg tables.

Examples:

import pandas as pd
import pyarrow as pa
from dagster import Definitions, asset
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.arrow import PyArrowIcebergIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)


resources = {
"io_manager": PyArrowIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}


@asset
def iris_dataset() -> pa.Table:
pa.Table.from_pandas(
pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
)


defs = Definitions(assets=[iris_dataset], resources=resources)

If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”. The I/O manager will check if the namespace exists in the Iceberg catalog. It does not automatically create the namespace if it does not exist.

@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pa.Table:
...

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pa.Table):
# my_table will just contain the data from column "a"
...
class dagster_iceberg.handler.IcebergBaseTypeHandler [source]
preview

This API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use.