Enabling multiple DBT runs on your warehouse
The Safest way to get parallel processing with multiple DBT processes in your warehouse when those processes are reading or updating data in the same models
At a role in a previous life, I encountered the following use case:-
Customers would upload their files
A DBT process would run transformations on tables populated with data from these files. The result of the
dbtprocess was a bunch of tables that would have a bunch of insights about the data uploaded by the customers.
Now, at point (2), we encountered a hurdle. When working on a table, we can have only one DBT process to avoid race conditions, even when using incremental models12.
Workarounds we tried
Everything is a view
In the first attempt, we just made all our models materialise as views. Theoretically, this sounded incredible. We would have the following benefits:-
There is no need to refresh any models; any data from the source tables is directly output to the analysis models.
dbtonly needed to be run when there were code changes.The output of the analysis models could be cached in views, and those caches could be updated whenever we found an event where new data was added to the source tables.
Unfortunately, we couldn’t make it much far with this solution. We realised immediately that the moment we started layering models to get from the source tables to our analysis view models, we encountered:-
We could not optimise our queries to use the partitioning and clustering schema to reduce data scanning to process only the delta.
As the intermediate layers increased, the time required to get the data of the analysis models increased quite a bit and thanks to (1) there was no way to optimise the query in an intermediate model when the analysis model view was being run.
This error was specific to BigQuery when using a bunch of nested views, it would just choke and die with the error, “Not enough resources to execute query”, despite using the on-demand pricing and placing no limits on the slot usage or data scanned3.
Queueing DBT Jobs
The next option we tried was using a single dbt process but using queues to process the data. The strategy was simple, we had a service that would listen for any requests to run the dbt job and:-
If a client uploaded files and marked them complete, send a request to the service to run the dbt process.
If no dbt process is running, then just start a new one.
If a dbt process is running just add the batch ID to the queue and wait for the dbt process to complete. In the meanwhile, if we encounter more requests to run dbt jobs, we just append those batch-ids to the queue.
When the previous dbt process is completed, start a new one and clear the queue.
This approach had the following advantages:-
The dbt job had all the resources available since only one job would run at a time.
In case multiple jobs got queued they would all get processed simultaneously, so there is no need to run one dbt process per job.
As a developer, I loved this mode. Optimum use of resources.
However, the big deal breaker here was the unpredictability. You never know when your job is going to be picked up for processing. Consider the situation, Customer1 has uploaded files worth 50GB, which might take 30 mins to process, Customer2, however, is just trying out the platform and uploads files of just 1MB which would take approximately 5 mins to process.
If Customer2 submits their job after Customer1’s job has already started, it will be queued and we will have to wait for 30 minutes for the job to be picked up. It didn’t make sense to penalise Customer2 just for submitting a job when the platform was busy.
Hence, a need for processing the jobs in parallel, both jobs will share the available BigQuery slots, Customer1’s job may take slightly longer as some slots will be consumed by Customer2’s job, but the shorter job doesn’t have to wait for the larger one to complete before the processing begins.
Levelling up with concurrent DBT runs
So as mentioned above we needed to run multiple DBT jobs in parallel, to ensure we didn’t shoot ourselves in the foot, I came up with the following strategy:-
Make all models purely incremental (no upserts, so no unique_key setting in the model config)
Using environment variables pass down a batch identifier and append it to every row, along with the dbt run_started_at timestamp.
Optional: When accessing dependent models find a way to ensure two processes avoid using the same rows during calculations. In my case, because each batch of parallel runs belonged to a different customer, I was able to isolate them by just using the batch_id passed via the environment variable in Step (2). As a result, if Model B was dependent on Model A, then when selecting records from Model A, it would only select the records generated in that same batch. This batch_id was an external identifier also present in the source data so it was possible for each dbt run to be isolated and affect rows only in its batch.
Create a unique temporary table for each dbt run for each model. This can be achieved by overriding the make_temp_relation macro. A great example is attached to the github issue here. Note you will also need to add a post model hook to delete the temp tables now.
Using the above strategy in our sandbox environment, we can now run multiple dbt processes, each targeting a specific batch (as set in the environment variable).
So did it work?
In Development Environment. With the above strategy, I did manage to run up to 8 dbt processes in parallel, I did need to increase the available slot sizes otherwise the queries would time out while waiting for the slots to become available.
Unfortunately could not test in production, as I was made redundant from the organisation before I could finish applying the above changes to the 400 or so models we had.
If you have two DBT processes running simultaneously on the same model (with table materialisation). You get the risk of race conditions, where only the results of the dbt process which ran last will persist.
In some cloud data warehouses like BigQuery, if another dbt process is modifying a source table for model A and another dbt process is working on model A, you can even get table not found errors, because at that very instant the source table was being replaced.
In the case of incremental models, if you have two processes working on the same model, the temp table where the delta is stored makes a hash of things. Depending on how the timing of the processes is interleaved, the first one will create the temp table and populate it with data; the second one will delete all data in the temp table and populate its own data and then the first one might copy the data over to the main model and delete the temp table. As a result, the second process will encounter an error(as the temp table will be deleted when it tries to copy the data to the final table).
Which as I discovered later is a very very bad idea.
