GCP (dagster-gcp)
BigQuery
Related Guides:
BigQuery Resource
- dagster_gcp.BigQueryResource ResourceDefinition [source]
- Resource for interacting with Google BigQuery. - Examples: - from dagster import Definitions, asset
 from dagster_gcp import BigQueryResource
 @asset
 def my_table(bigquery: BigQueryResource):
 with bigquery.get_client() as client:
 client.query("SELECT * FROM my_dataset.my_table")
 defs = Definitions(
 assets=[my_table],
 resources={
 "bigquery": BigQueryResource(project="my-project")
 }
 )
BigQuery I/O Manager
- dagster_gcp.BigQueryIOManager IOManagerDefinition [source]
- Base class for an I/O manager definition that reads inputs from and writes outputs to BigQuery. - Examples: - from dagster_gcp import BigQueryIOManager
 from dagster_bigquery_pandas import BigQueryPandasTypeHandler
 from dagster import Definitions, EnvVar
 class MyBigQueryIOManager(BigQueryIOManager):
 @staticmethod
 def type_handlers() -> Sequence[DbTypeHandler]:
 return [BigQueryPandasTypeHandler()]
 @asset(
 key_prefix=["my_dataset"] # my_dataset will be used as the dataset in BigQuery
 )
 def my_table() -> pd.DataFrame: # the name of the asset will be the table name
 ...
 defs = Definitions(
 assets=[my_table],
 resources={
 "io_manager": MyBigQueryIOManager(project=EnvVar("GCP_PROJECT"))
 }
 )- You can set a default dataset to store the assets using the - datasetconfiguration value of the BigQuery I/O Manager. This dataset will be used if no other dataset is specified directly on an asset or op.- defs = Definitions(
 assets=[my_table],
 resources={
 "io_manager": MyBigQueryIOManager(project=EnvVar("GCP_PROJECT"), dataset="my_dataset")
 }
 )- On individual assets, you an also specify the dataset where they should be stored using metadata or by adding a - key_prefixto the asset key. If both- key_prefixand metadata are defined, the metadata will take precedence.- @asset(
 key_prefix=["my_dataset"] # will be used as the dataset in BigQuery
 )
 def my_table() -> pd.DataFrame:
 ...
 @asset(
 # note that the key needs to be "schema"
 metadata={"schema": "my_dataset"} # will be used as the dataset in BigQuery
 )
 def my_other_table() -> pd.DataFrame:
 ...- For ops, the dataset can be specified by including a “schema” entry in output metadata. - @op(
 out={"my_table": Out(metadata={"schema": "my_schema"})}
 )
 def make_my_table() -> pd.DataFrame:
 ...- If none of these is provided, the dataset will default to “public”. - To only use specific columns of a table as input to a downstream op or asset, add the metadata - columnsto the- Inor- AssetIn.- @asset(
 ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
 )
 def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
 # my_table will just contain the data from column "a"
 ...- If you cannot upload a file to your Dagster deployment, or otherwise cannot authenticate with GCP via a standard method, you can provide a service account key as the - gcp_credentialsconfiguration. Dagster will store this key in a temporary file and set- GOOGLE_APPLICATION_CREDENTIALSto point to the file. After the run completes, the file will be deleted, and- GOOGLE_APPLICATION_CREDENTIALSwill be unset. The key must be base64 encoded to avoid issues with newlines in the keys. You can retrieve the base64 encoded with this shell command:- cat $GOOGLE_APPLICATION_CREDENTIALS | base64
BigQuery Ops
- dagster_gcp.bq_create_dataset [source]
- BigQuery Create Dataset. - This op encapsulates creating a BigQuery dataset. - Expects a BQ client to be provisioned in resources as context.resources.bigquery. 
- dagster_gcp.bq_delete_dataset [source]
- BigQuery Delete Dataset. - This op encapsulates deleting a BigQuery dataset. - Expects a BQ client to be provisioned in resources as context.resources.bigquery. 
- dagster_gcp.bq_op_for_queries [source]
- Executes BigQuery SQL queries. - Expects a BQ client to be provisioned in resources as context.resources.bigquery. 
Data Freshness
- dagster_gcp.fetch_last_updated_timestamps [source]
- Get the last updated timestamps of a list BigQuery table. - Note that this only works on BigQuery tables, and not views. - Parameters: - client (bigquery.Client) – The BigQuery client.
- dataset_id (str) – The BigQuery dataset ID.
- table_ids (Sequence[str]) – The table IDs to get the last updated timestamp for.
 - Returns: A mapping of table IDs to their last updated timestamps (UTC).Return type: Mapping[str, datetime] 
GCS
GCS Resource
- dagster_gcp.GCSResource ResourceDefinition [source]
- Resource for interacting with Google Cloud Storage. - Example: - @asset
 def my_asset(gcs: GCSResource):
 client = gcs.get_client()
 # client is a google.cloud.storage.Client
 ...
GCS I/O Manager
- dagster_gcp.GCSPickleIOManager IOManagerDefinition [source]
- Persistent IO manager using GCS for storage. - Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for GCS and the backing bucket. - Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at - \<base_dir>/\<asset_key>. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.- Subsequent materializations of an asset will overwrite previous materializations of that asset. With a base directory of - /my/base/path, an asset with key- AssetKey(["one", "two", "three"])would be stored in a file called- threein a directory with path- /my/base/path/one/two/.- Example usage: - Attach this IO manager to a set of assets.
from dagster import asset, Definitions
 from dagster_gcp.gcs import GCSPickleIOManager, GCSResource
 @asset
 def asset1():
 # create df ...
 return df
 @asset
 def asset2(asset1):
 return asset1[:5]
 Definitions(
 assets=[asset1, asset2],
 resources={
 "io_manager": GCSPickleIOManager(
 gcs_bucket="my-cool-bucket",
 gcs_prefix="my-cool-prefix",
 gcs=GCSResource(project="my-cool-project")
 ),
 }
 )
- Attach this IO manager to your job to make it available to your ops.
from dagster import job
 from dagster_gcp.gcs import GCSPickleIOManager, GCSResource
 @job(
 resource_defs={
 "io_manager": GCSPickleIOManager(
 gcs=GCSResource(project="my-cool-project")
 gcs_bucket="my-cool-bucket",
 gcs_prefix="my-cool-prefix"
 ),
 }
 )
 def my_job():
 ...
 
- Attach this IO manager to a set of assets.
GCS Sensor
- dagster_gcp.gcs.sensor.get_gcs_keys [source]
- Return a list of updated keys in a GCS bucket. - Parameters: - bucket (str) – The name of the GCS bucket.
- prefix (Optional[str]) – The prefix to filter the keys by.
- since_key (Optional[str]) – The key to start from. If provided, only keys updated after this key will be returned.
- gcs_session (Optional[google.cloud.storage.client.Client]) – A GCS client session. If not provided, a new session will be created.
 - Returns: A list of keys in the bucket, sorted by update time, that are newer than the since_key.Return type: List[str] Example: - @resource
 def google_cloud_storage_client(context):
 return storage.Client().from_service_account_json("my-service-account.json")
 @sensor(job=my_job, required_resource_keys={"google_cloud_storage_client"})
 def my_gcs_sensor(context):
 since_key = context.cursor or None
 new_gcs_keys = get_gcs_keys(
 "my-bucket",
 prefix="data",
 since_key=since_key,
 gcs_session=context.resources.google_cloud_storage_client
 )
 if not new_gcs_keys:
 return SkipReason("No new gcs files found for bucket 'my-bucket'.")
 for gcs_key in new_gcs_keys:
 yield RunRequest(run_key=gcs_key, run_config={
 "ops": {
 "gcs_files": {
 "config": {
 "gcs_key": gcs_key
 }
 }
 }
 })
 last_key = new_gcs_keys[-1]
 context.update_cursor(last_key)
File Manager
- classdagster_gcp.GCSFileHandle [source]
- A reference to a file on GCS. 
- dagster_gcp.GCSFileManagerResource ResourceDefinition [source]
- FileManager that provides abstract access to GCS. 
GCS Compute Log Manager
- classdagster_gcp.gcs.GCSComputeLogManager [source]
- Logs op compute function stdout and stderr to GCS. - Users should not instantiate this class directly. Instead, use a YAML block in - dagster.yamlsuch as the following:- compute_logs:
 module: dagster_gcp.gcs.compute_log_manager
 class: GCSComputeLogManager
 config:
 bucket: "mycorp-dagster-compute-logs"
 local_dir: "/tmp/cool"
 prefix: "dagster-test-"
 upload_interval: 30- There are more configuration examples in the instance documentation guide: https://docs.dagster.io/deployment/oss/oss-instance-configuration#compute-log-storage - Parameters: - bucket (str) – The name of the GCS bucket to which to log.
- local_dir (Optional[str]) – Path to the local directory in which to stage logs. Default: dagster_shared.seven.get_system_temp_directory().
- prefix (Optional[str]) – Prefix for the log file keys.
- json_credentials_envvar (Optional[str]) – Environment variable that contains the JSON with a private key and other credentials information. If this is set, GOOGLE_APPLICATION_CREDENTIALSwill be ignored. Can be used when the private key cannot be used as a file.
- upload_interval – (Optional[int]): Interval in seconds to upload partial log files to GCS. By default, will only upload when the capture is complete.
- show_url_only – (Optional[bool]): Only show the URL of the log file in the UI, instead of fetching and displaying the full content. Default False.
- inst_data (Optional[ConfigurableClassData]) – Serializable representation of the compute log manager when instantiated from config.
 
Dataproc
Dataproc Resource
- dagster_gcp.DataprocResource ResourceDefinition [source]
- betaThis API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases. Resource for connecting to a Dataproc cluster. Example: @asset
 def my_asset(dataproc: DataprocResource):
 with dataproc.get_client() as client:
 # client is a dagster_gcp.DataprocClient
 ...
Dataproc Ops
- dagster_gcp.dataproc_op =<dagster._core.definitions.op_definition.OpDefinition object> [source]
- betaThis API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases. 
Pipes
Clients
- classdagster_gcp.pipes.PipesDataprocJobClient [source]
- previewThis API is currently in preview, and may have breaking changes in patch version releases. This API is not considered ready for production use. A pipes client for running workloads on GCP Dataproc in Job mode. Parameters: - client (Optional[google.cloud.dataproc_v1.JobControllerClient]) – The GCP Dataproc client to use.
- context_injector (Optional[PipesContextInjector]) – A context injector to use to inject context into the GCP Dataproc job. Defaults to PipesEnvContextInjector.
- message_reader (PipesMessageReader) – A message reader to use to read messages from the GCP Dataproc job. For example, PipesGCSMessageReader.
- forward_termination (bool) – Whether to cancel the GCP Dataproc job if the Dagster process receives a termination signal.
- poll_interval (float) – The interval in seconds to poll the GCP Dataproc job for status updates. Defaults to 5 seconds.
 - run [source]
- Run a job on GCP Dataproc, enriched with the pipes protocol. - Parameters: - context (Union[OpExecutionContext, AssetExecutionContext]) – The context of the currently executing Dagster op or asset.
- submit_job_params (SubmitJobParams) – Parameters for the JobControllerClient.submit_jobcall. See Google Cloud SDK Documentation
- extras (Optional[Dict[str, Any]]) – Additional information to pass to the Pipes session in the external process.
 - Returns: Wrapper containing results reported by the external process.Return type: PipesClientCompletedInvocation 
 
Context Injectors
- classdagster_gcp.pipes.PipesGCSContextInjector [source]
- A context injector that injects context by writing to a temporary GCS location. - Parameters: - bucket (str) – The GCS bucket to write to.
- client (google.cloud.storage.Client) – A Google Cloud SDK client to use to write to GCS.
- key_prefix (Optional[str]) – An optional prefix to use for the GCS key. Will be concatenated with a random string.
 
Message Readers
- classdagster_gcp.pipes.PipesGCSMessageReader [source]
- Message reader that reads messages by periodically reading message chunks from a specified GCS bucket. - If log_readers is passed, this reader will also start the passed readers when the first message is received from the external process. - Parameters: - interval (float) – interval in seconds between attempts to download a chunk
- bucket (str) – The GCS bucket to read from.
- client (Optional[cloud.google.storage.Client]) – The GCS client to use.
- log_readers (Optional[Sequence[PipesLogReader]]) – A set of log readers for logs on GCS.
- include_stdio_in_messages (bool) – Whether to send stdout/stderr to Dagster via Pipes messages. Defaults to False.
 
Legacy
- dagster_gcp.ConfigurablePickledObjectGCSIOManager IOManagerDefinition [source]
- deprecatedThis API will be removed in version 2.0. Please use GCSPickleIOManager instead.. Renamed to GCSPickleIOManager. See GCSPickleIOManager for documentation. 
- dagster_gcp.build_bigquery_io_manager IOManagerDefinition [source]
- Builds an I/O manager definition that reads inputs from and writes outputs to BigQuery. - Parameters: - type_handlers (Sequence[DbTypeHandler]) – Each handler defines how to translate between slices of BigQuery tables and an in-memory type - e.g. a Pandas DataFrame. If only one DbTypeHandler is provided, it will be used as the default_load_type.
- default_load_type (Type) – When an input has no type annotation, load it as this type.
 - Returns: IOManagerDefinition Examples: - from dagster_gcp import build_bigquery_io_manager
 from dagster_bigquery_pandas import BigQueryPandasTypeHandler
 from dagster import Definitions
 @asset(
 key_prefix=["my_prefix"],
 metadata={"schema": "my_dataset"} # will be used as the dataset in BigQuery
 )
 def my_table() -> pd.DataFrame: # the name of the asset will be the table name
 ...
 @asset(
 key_prefix=["my_dataset"] # my_dataset will be used as the dataset in BigQuery
 )
 def my_second_table() -> pd.DataFrame: # the name of the asset will be the table name
 ...
 bigquery_io_manager = build_bigquery_io_manager([BigQueryPandasTypeHandler()])
 Definitions(
 assets=[my_table, my_second_table],
 resources={
 "io_manager": bigquery_io_manager.configured({
 "project" : {"env": "GCP_PROJECT"}
 })
 }
 )- You can set a default dataset to store the assets using the - datasetconfiguration value of the BigQuery I/O Manager. This dataset will be used if no other dataset is specified directly on an asset or op.- Definitions(
 assets=[my_table],
 resources={
 "io_manager": bigquery_io_manager.configured({
 "project" : {"env": "GCP_PROJECT"}
 "dataset": "my_dataset"
 })
 }
 )- On individual assets, you an also specify the dataset where they should be stored using metadata or by adding a - key_prefixto the asset key. If both- key_prefixand metadata are defined, the metadata will take precedence.- @asset(
 key_prefix=["my_dataset"] # will be used as the dataset in BigQuery
 )
 def my_table() -> pd.DataFrame:
 ...
 @asset(
 # note that the key needs to be "schema"
 metadata={"schema": "my_dataset"} # will be used as the dataset in BigQuery
 )
 def my_other_table() -> pd.DataFrame:
 ...- For ops, the dataset can be specified by including a “schema” entry in output metadata. - @op(
 out={"my_table": Out(metadata={"schema": "my_schema"})}
 )
 def make_my_table() -> pd.DataFrame:
 ...- If none of these is provided, the dataset will default to “public”. - To only use specific columns of a table as input to a downstream op or asset, add the metadata - columnsto the- Inor- AssetIn.- @asset(
 ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
 )
 def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
 # my_table will just contain the data from column "a"
 ...- If you cannot upload a file to your Dagster deployment, or otherwise cannot authenticate with GCP via a standard method, you can provide a service account key as the - gcp_credentialsconfiguration. Dagster willstore this key in a temporary file and set- GOOGLE_APPLICATION_CREDENTIALSto point to the file. After the run completes, the file will be deleted, and- GOOGLE_APPLICATION_CREDENTIALSwill be unset. The key must be base64 encoded to avoid issues with newlines in the keys. You can retrieve the base64 encoded with this shell command:- cat $GOOGLE_APPLICATION_CREDENTIALS | base64
- dagster_gcp.gcs_pickle_io_manager IOManagerDefinition [source]
- Persistent IO manager using GCS for storage. - Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for GCS and the backing bucket. - Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at - \<base_dir>/\<asset_key>. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.- Subsequent materializations of an asset will overwrite previous materializations of that asset. With a base directory of - /my/base/path, an asset with key- AssetKey(["one", "two", "three"])would be stored in a file called- threein a directory with path- /my/base/path/one/two/.- Example usage: - Attach this IO manager to a set of assets.
from dagster import Definitions, asset
 from dagster_gcp.gcs import gcs_pickle_io_manager, gcs_resource
 @asset
 def asset1():
 # create df ...
 return df
 @asset
 def asset2(asset1):
 return asset1[:5]
 Definitions(
 assets=[asset1, asset2],
 resources={
 "io_manager": gcs_pickle_io_manager.configured(
 {"gcs_bucket": "my-cool-bucket", "gcs_prefix": "my-cool-prefix"}
 ),
 "gcs": gcs_resource.configured({"project": "my-cool-project"}),
 },
 )
- Attach this IO manager to your job to make it available to your ops.
from dagster import job
 from dagster_gcp.gcs import gcs_pickle_io_manager, gcs_resource
 @job(
 resource_defs={
 "io_manager": gcs_pickle_io_manager.configured(
 {"gcs_bucket": "my-cool-bucket", "gcs_prefix": "my-cool-prefix"}
 ),
 "gcs": gcs_resource.configured({"project": "my-cool-project"}),
 },
 )
 def my_job():
 ...
 
- Attach this IO manager to a set of assets.
- dagster_gcp.gcs_file_manager ResourceDefinition [source]
- FileManager that provides abstract access to GCS. - Implements the - FileManagerAPI.
- dagster_gcp.dataproc_resource ResourceDefinition [source]
- betaThis API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases.