dlt.sources.sql_database
Source that loads tables form any SQLAlchemy supported database, supports batching requests and incremental loads.
sql_database
@decorators.source
def sql_database(
credentials: Union[ConnectionStringCredentials, Engine,
str] = dlt.secrets.value,
schema: Optional[str] = dlt.config.value,
metadata: Optional[MetaData] = None,
table_names: Optional[List[str]] = dlt.config.value,
chunk_size: int = 50000,
backend: TableBackend = "sqlalchemy",
detect_precision_hints: Optional[bool] = False,
reflection_level: Optional[ReflectionLevel] = "full",
defer_table_reflect: Optional[bool] = None,
table_adapter_callback: Optional[TTableAdapter] = None,
backend_kwargs: Dict[str, Any] = None,
include_views: bool = False,
type_adapter_callback: Optional[TTypeAdapter] = None,
query_adapter_callback: Optional[TQueryAdapter] = None,
resolve_foreign_keys: bool = False,
engine_adapter_callback: Optional[Callable[[Engine], Engine]] = None
) -> Iterable[DltResource]
A dlt source which loads data from an SQL database using SQLAlchemy. Resources are automatically created for each table in the schema or from the given list of tables.
Arguments:
-
credentialsUnion[ConnectionStringCredentials, Engine, str] - Database credentials or ansqlalchemy.Engineinstance. -
schemaOptional[str] - Name of the database schema to load (if different from default). -
metadataOptional[MetaData] - Optionalsqlalchemy.MetaDatainstance.schemaargument is ignored when this is used. -
table_namesOptional[List[str]] - A list of table names to load. By default, all tables in the schema are loaded. -
chunk_sizeint - Number of rows yielded in one batch. SQL Alchemy will create additional internal rows buffer twice the chunk size. -
backendTableBackend - Type of backend to generate table data. One of: "sqlalchemy", "pyarrow", "pandas" and "connectorx". "sqlalchemy" yields batches as lists of Python dictionaries, "pyarrow" and "connectorx" yield batches as arrow tables, "pandas" yields panda frames. "sqlalchemy" is the default and does not require additional dependencies, "pyarrow" creates stable destination schemas with correct data types, "connectorx" is typically the fastest but ignores the "chunk_size" so you must deal with large tables yourself. -
detect_precision_hintsOptional[bool] - Deprecated. Usereflection_level. Set column precision and scale hints for supported data types in the target schema based on the columns in the source tables. This is disabled by default. -
reflection_levelOptional[ReflectionLevel] - Specifies how much information should be reflected from the source database schema. -
"minimal"- Only table names, nullability and primary keys are reflected. Data types are inferred from the data. -
"full"default - Data types will be reflected on top of "minimal".dltwill coerce the data into reflected types if necessary. -
"full_with_precision"- Sets precision and scale on supported data types (ie. decimal, text, binary). Creates big and regular integer types. -
defer_table_reflectOptional[bool] - Will connect and reflect table schema only when yielding data. Requirestable_namesto be explicitly passed. Enable this option when running on Airflow and other orchestrators that create execution DAGs. When True, schema is decided during execution, which may overridequery_adapter_callbackmodifications orapply_hints. -
table_adapter_callbackOptional[TTableAdapter] - Receives each reflected table. May be used to modify the list of columns that will be selected. -
backend_kwargsDict[str, Any] - kwargs passed to table backend ie. "conn" is used to pass specialized connection string to connectorx. -
include_viewsbool - Reflect views as well as tables. Note view names included intable_namesare always included regardless of this setting. -
type_adapter_callbackOptional[TTypeAdapter] - Callable to override type inference when reflecting columns. Argument is a single sqlalchemy data type (TypeEngineinstance) and it should return another sqlalchemy data type, orNone(type will be inferred from data) -
query_adapter_callbackOptional[TQueryAdapter] - Callable to override the SELECT query used to fetch data from the table. The callback receives the sqlalchemySelectand correspondingTable, 'IncrementalandEngineobjects and should return the modifiedSelectorText`. -
resolve_foreign_keysbool - Translate foreign keys in the same schema toreferencestable hints. May incur additional database calls as all referenced tables are reflected. -
engine_adapter_callbackOptional[Callable[[Engine], Engine]] - Callback to configure, modify an Engine instance that will be used to open a connection ie. to set transaction isolation level.
Yields:
DltResource- DLT resources for each table to be loaded.
sql_table
@decorators.resource(name=lambda args: args["table"],
spec=SqlTableResourceConfiguration)
def sql_table(credentials: Union[ConnectionStringCredentials, Engine,
str] = dlt.secrets.value,
table: str = dlt.config.value,
schema: Optional[str] = dlt.config.value,
metadata: Optional[MetaData] = None,
incremental: Optional[Incremental[Any]] = None,
chunk_size: int = 50000,
backend: TableBackend = "sqlalchemy",
detect_precision_hints: Optional[bool] = None,
reflection_level: Optional[ReflectionLevel] = "full",
defer_table_reflect: Optional[bool] = None,
table_adapter_callback: Optional[TTableAdapter] = None,
backend_kwargs: Dict[str, Any] = None,
type_adapter_callback: Optional[TTypeAdapter] = None,
included_columns: Optional[List[str]] = None,
excluded_columns: Optional[List[str]] = None,
query_adapter_callback: Optional[TQueryAdapter] = None,
resolve_foreign_keys: bool = False,
engine_adapter_callback: Callable[[Engine], Engine] = None,
write_disposition: TWriteDispositionConfig = "append",
primary_key: TColumnNames = None,
merge_key: TColumnNames = None) -> DltResource
A dlt resource which loads data from an SQL database table using SQLAlchemy.
Arguments:
-
credentialsUnion[ConnectionStringCredentials, Engine, str] - Database credentials or anEngineinstance representing the database connection. -
tablestr - Name of the table or view to load. -
schemaOptional[str] - Optional name of the schema the table belongs to. -
metadataOptional[MetaData] - Optionalsqlalchemy.MetaDatainstance. If provided, theschemaargument is ignored. -
incrementalOptional[Incremental[Any]] - Option to enable incremental loading for the table. E.g.,incremental=dlt.sources.incremental('updated_at', pendulum.parse('2022-01-01T00:00:00Z')) -
chunk_sizeint - Number of rows yielded in one batch. SQL Alchemy will create additional internal rows buffer twice the chunk size. -
backendTableBackend - Type of backend to generate table data. One of: "sqlalchemy", "pyarrow", "pandas" and "connectorx". "sqlalchemy" yields batches as lists of Python dictionaries, "pyarrow" and "connectorx" yield batches as arrow tables, "pandas" yields panda frames. "sqlalchemy" is the default and does not require additional dependencies, "pyarrow" creates stable destination schemas with correct data types, "connectorx" is typically the fastest but ignores the "chunk_size" so you must deal with large tables yourself. -
detect_precision_hintsOptional[bool] - Deprecated. Usereflection_level. Set column precision and scale hints for supported data types in the target schema based on the columns in the source tables. This is disabled by default. -
reflection_levelOptional[ReflectionLevel] - Specifies how much information should be reflected from the source database schema. -
"minimal"- Only table names, nullability and primary keys are reflected. Data types are inferred from the data. -
"full"default - Data types will be reflected on top of "minimal".dltwill coerce the data into reflected types if necessary. -
"full_with_precision"- Sets precision and scale on supported data types (ie. decimal, text, binary). Creates big and regular integer types. -
defer_table_reflectOptional[bool] - Will connect and reflect table schema only when yielding data. Requirestable_namesto be explicitly passed. Enable this option when running on Airflow and other orchestrators that create execution DAGs. When True, schema is decided during execution, which may overridequery_adapter_callbackmodifications orapply_hints. -
table_adapter_callbackOptional[TTableAdapter] - Receives each reflected table. May be used to modify the list of columns that will be selected. -
backend_kwargsDict[str, Any], optional - kwargs passed to table backend ie. "conn" is used to pass specialized connection string to connectorx. -
type_adapter_callbackOptional[TTypeAdapter] - Callable to override type inference when reflecting columns. Argument is a single sqlalchemy data type (TypeEngineinstance) and it should return another sqlalchemy data type, orNone(type will be inferred from data) -
included_columnsOptional[List[str]] - List of column names to select from the table. If not provided, all columns are loaded. -
excluded_columnsOptional[List[str]] - List of column names to exclude from select. If not provided, all columns are loaded. -
query_adapter_callbackOptional[TQueryAdapter] - Callable to override the SELECT query used to fetch data from the table. The callback receives the sqlalchemySelectand correspondingTable, 'IncrementalandEngineobjects and should return the modifiedSelectorText`. -
resolve_foreign_keysbool - Translate foreign keys in the same schema toreferencestable hints. May incur additional database calls as all referenced tables are reflected. -
engine_adapter_callbackCallable[[Engine], Engine] - Callback to configure, modify an Engine instance that will be used to open a connection ie. to set transaction isolation level. -
write_dispositionTWriteDispositionConfig - write disposition of the table resource, defaults toappend. -
primary_keyTColumnNames - A list of column names that comprise a private key. Typically used with "merge" write disposition to deduplicate loaded data. -
merge_keyTColumnNames - A list of column names that define a merge key. Typically used with "merge" write disposition to remove overlapping data ranges ie. to keep a single record for a given day.
Returns:
DltResource- The dlt resource for loading data from the SQL database table.