Маркетинговые команды в 2026 году принимают решения на основе данных, а не борются с ними. GA4, Meta Ads, Google Ads, CRM, CDP, server-side GTM — всё это падает в разные таблицы. Команда вручную объединяет информацию в электронных таблицах, цифры меняются каждую неделю, никто не доверяет результатам. Этот хаос исчезает с современным data stack'ом: BigQuery как источник, слой трансформации dbt, семантический слой как граф метрик. Вы версионируете код в репозитории, каждое изменение проходит тесты, метрики берутся из единого источника истины. Эта статья показывает, как комбинация dbt + BigQuery превращает маркетинговый pipeline в production-grade систему.
Source mapping: стандартизация сырых данных
Первая задача dbt — source mapping, приведение сырых данных из разных систем к единой схеме. В BigQuery таблица analytics_123456.events_* приходит из GA4, facebook_ads.ads_insights из Meta API, crm.transactions из Shopify. Каждый источник имеет свой формат timestamp'а, разные идентификаторы пользователей, разные названия колонок валюты. Вы определяете эти сырые таблицы в sources.yml:
version: 2
sources:
- name: ga4
database: analytics_123456
tables:
- name: events_
identifier: "events_*"
loaded_at_field: event_timestamp
- name: meta_ads
database: facebook_ads
schema: public
tables:
- name: ads_insights
loaded_at_field: date_start
Это определение говорит dbt: "эти таблицы — upstream источник, я их не трогаю, но проверяю свежесть". Команда dbt source freshness контролирует, когда последний раз пришли данные — если Meta API опаздывает, система отправляет алерт. Без source mapping вы пишете в каждой модели прямое обращение SELECT * FROM analytics_123456.events_20260614, и при изменении названия таблицы ломаются 40 моделей. С mapping'ом ссылка становится {{ source('ga4', 'events_') }}, изменение распространяется из одного места.
GA4 использует Unix microsecond для event_timestamp, Meta Ads — ISO строки для date_start, CRM — UTC datetime для created_at. Source mapping нормализует это: в GA4 TIMESTAMP_MICROS(event_timestamp) AS event_time, в Meta PARSE_TIMESTAMP('%Y-%m-%d', date_start) AS event_time. Эта нормализация обеспечивает чистый входной сигнал для downstream моделей.
Слой моделирования: staging, intermediate, mart
Мощь dbt заключается в слоистом моделировании — staging, intermediate, mart слои. Staging модели берут данные 1:1 из источника, делают только переименование и приведение типов. stg_ga4_events.sql:
SELECT
TIMESTAMP_MICROS(event_timestamp) AS event_time,
user_pseudo_id AS anonymous_id,
event_name,
(SELECT value.string_value FROM UNNEST(event_params) WHERE key = 'session_id') AS session_id,
geo.country,
device.category AS device_category
FROM {{ source('ga4', 'events_') }}
WHERE _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY))
AND FORMAT_DATE('%Y%m%d', CURRENT_DATE())
Staging обеспечивает чистые данные, но без бизнес-логики. Intermediate модели добавляют логику: сессионизация, атрибуция, воронка событий. int_sessions.sql группирует GA4 события по сеансам:
WITH session_events AS (
SELECT
session_id,
MIN(event_time) AS session_start,
MAX(event_time) AS session_end,
COUNT(DISTINCT CASE WHEN event_name = 'page_view' THEN event_time END) AS pageviews,
MAX(CASE WHEN event_name = 'purchase' THEN 1 ELSE 0 END) AS converted
FROM {{ ref('stg_ga4_events') }}
GROUP BY session_id
)
SELECT
*,
TIMESTAMP_DIFF(session_end, session_start, SECOND) AS duration_seconds
FROM session_events
Mart модели — финальный слой потребления, на них смотрят BI инструменты, Looker, внутренние дашборды. fct_marketing_performance.sql объединяет все каналы, считает расходы, доход, ROAS. Каждая mart модель сосредоточена на одной бизнес-сущности: dim_customers, fct_orders, fct_sessions. Naming convention в mart критичен — dim_ для dimension (клиент, товар), fct_ для fact (транзакция, событие), rpt_ для report агрегатов.
Семантический слой: KPI как код
Семантический слой переносит определения метрик из таблиц в dbt YAML — "что такое доход", "как считается CAC" теперь не в электронной таблице, а в версионируемом коде. В dbt v1.6+ вы определяете граф метрик в metrics.yml:
version: 2
metrics:
- name: revenue
label: Revenue
model: ref('fct_orders')
calculation_method: sum
expression: order_amount
timestamp: order_date
time_grains: [day, week, month, quarter]
dimensions:
- channel
- country
- device_category
- name: cac
label: Customer Acquisition Cost
calculation_method: derived
expression: "{{ metric('ad_spend') }} / {{ metric('new_customers') }}"
timestamp: acquisition_date
time_grains: [month, quarter]
С семантическим слоем BI инструмент не считает CAC, это делает dbt. Когда Looker запрашивает CAC, dbt возвращает скомпилированный SQL, который join'ит таблицы расходов и новых клиентов, затем делит. Определение — это код, у него есть история в git: "кто изменил формулу CAC, когда и почему". Формула в электронной таблице теряется, здесь — версионируется.
В проектах Roibase семантический слой входит в анализ данных и инженерию внутренних метрик — не только дефиниция метрики, но иерархия KPI, mapping размерностей, стандартизация гранулярности. Пример: метрика "revenue" — это сумма fct_orders.order_amount, но "recognized_revenue" из той же таблицы фильтруется по recognized_at timestamp (для SaaS с подписками). Одна таблица, две метрики, разная бизнес-логика.
Exposures: видимость downstream зависимостей
Exposure — способ dbt ответить на вопрос "кто использует эту модель". Если дашборд Looker смотрит на fct_marketing_performance, вы определяете это в exposures.yml:
version: 2
exposures:
- name: marketing_dashboard
type: dashboard
maturity: high
owner:
name: Growth Team
email: [email protected]
depends_on:
- ref('fct_marketing_performance')
- ref('dim_customers')
description: "Executive marketing dashboard — daily refresh, 90-day rolling window"
url: https://looker.company.com/dashboards/123
Без exposure вы меняете fct_marketing_performance, дашборд в Looker показывает нули, 2 часа отлаживаете проблему. С exposure dbt compile --select +exposure:marketing_dashboard показывает все upstream модели, вы оцениваете влияние изменения до merging.
Exposure не только для BI инструментов — reverse ETL (Hightouch, Census) тоже exposure. Если вы отправляете таблицу customers в Meta CAPI:
exposures:
- name: meta_capi_sync
type: application
maturity: high
depends_on:
- ref('dim_customers')
description: "Meta Conversion API — incremental customer events, 5-minute delay"
Это говорит: "если изменишь схему dim_customers, сломаешь CAPI sync". Production: обновление модели → ошибка CAPI → потеря данных атрибуции. Exposure дает ранний сигнал.
Production pipeline: incremental builds и покрытие тестами
В production dbt не делает full refresh каждый день — использует incremental модели. fct_orders.sql переобрабатывает только последние 3 дня:
{{ config(
materialized='incremental',
unique_key='order_id',
partition_by={'field': 'order_date', 'data_type': 'date'},
cluster_by=['customer_id', 'channel']
) }}
SELECT
order_id,
customer_id,
order_date,
order_amount,
channel
FROM {{ ref('stg_shopify_orders') }}
{% if is_incremental() %}
WHERE order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY)
{% endif %}
Incremental build снижает стоимость BigQuery на 90% — вместо сканирования 2TB сканируете 50GB. Partition + cluster улучшает performance: запрос WHERE customer_id = 'X' идет только в нужный cluster, full scan не нужен.
Покрытие тестами критично. Вы пишете тесты в schema.yml для каждой модели:
models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: order_amount
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: order_date
tests:
- dbt_utils.recency:
datepart: day
interval: 7
dbt test выполняет эти условия в BigQuery — если order_amount отрицательный, build падает. В production каждый commit проходит CI/CD: dbt run --select state:modified+ → dbt test --select state:modified+. Запускаются измененные модели + их зависимости downstream, затем тесты, только потом merge allowed.
Оркестрация: Airflow, Prefect, dbt Cloud
dbt сам по себе не оркестратор — вы schedule'ируете его через Airflow или Prefect. Пример Airflow DAG:
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.operators.bash import BashOperator
dbt_run = BashOperator(
task_id='dbt_run',
bash_command='cd /opt/dbt && dbt run --profiles-dir .',
dag=dag
)
dbt_test = BashOperator(
task_id='dbt_test',
bash_command='cd /opt/dbt && dbt test',
dag=dag
)
dbt_run >> dbt_test
Альтернатива — dbt Cloud (managed оркестрация, Web IDE, Slack алерты). Но большинство enterprise выбирают Airflow, потому что параллельно с dbt есть другие task'и: upstream API pull, downstream reverse ETL, snapshot таблицы.
Стратегия schedule'ирования зависит от свежести данных. GA4 задерживается на 24 часа (processing_date ≠ event_date), Meta Ads API не real-time. Staging модели триггерятся свежестью источника — когда GA4 выгружает новую partition, stg_ga4_events refreshes, это распространяется на intermediate → mart цепь. Airflow sensor проверяет наличие новой partition BigQuery:
wait_for_ga4 = BigQueryTableExistenceSensor(
task_id='wait_for_ga4_partition',
project_id='analytics_123456',
dataset_id='events_',
table_id=f"events_{yesterday.strftime('%Y%m%d')}",
poke_interval=300
)
Когда partition готова, dbt chain стартует. Этот паттерн решает проблему late-arriving data — API задержка не блокирует pipeline, она его ждет.
Tradeoffs: что dbt не решает
dbt — это transformation engine, не data loader. Кто загружает данные в BigQuery? Fivetran, Airbyte, custom Python скрипт. dbt предполагает, что сырые данные уже там. Паттерн ELT: Extract-Load-Transform. Отличие от ETL в том, что Transform происходит внутри warehouse'а. dbt отвечает за T, EL — это отдельный toolchain.
dbt не поддерживает real-time streaming. Kafka → BigQuery streaming insert → dbt incremental model chain дает минутную задержку. Для sub-second latency (fraud detection, dynamic pricing) нужны stream processors — Flink, Spark Structured Streaming, Materialize. dbt для этого не подходит.
Поддержка Python моделей в dbt (v1.3+) ограничена. Вы можете манипулировать pandas dataframe'ами, но тяжелый ML training dbt'де не делается. Типичный паттерн: feature engineering в dbt, model training в Vertex AI, inference в BigQuery ML. Python модель dbt:
def model(dbt, session):
df = dbt.ref('stg_orders').to_pandas()
df['log_amount'] = np.log1p(df['order_amount'])
return df