Skip to content
OpsTech Technical Design
Share
Explore

icon picker
Driver Progress Tracker

Context

Progress Tracker is a feature on driver app, showing available “missions” that drivers can accomplish to achieve rewards. These missions have common meta-data: some conditions during a period. A condition is a measurable metric must reach a target value, such as Acceptance Rate must > 70%.
Behind the scene, each driver mission is calculated by a single metabase card. And this approach causing a big issue: to calculate progress on a mission for a driver daily, we have to run through complicated query that consume all the data of the whole period, hence processing effort is enormous. Another trouble is different drivers have different period for the same mission, hence the current pipeline is more error-prone. So, the approach to measure mission progress by custom fragmented SQL code is not performance optimized and management-optimized.

Solution

We need to build a new processing engine that would help improving processing performance much better and reduce logic error.
To do so, we decide to rebuild a new data structure and data pipeline
Data Structure
0
Name
Structure
Sample
Temp
1
mission
(id,metrics, conditions)
id: P1
metrics: [total_accepted, total_recommend]
2
mission_supplier
(policy_id, supplier_id,from_date,to_date, conditions_values)
mission_id: P1
conditions: [AR: "total_accepted/total_recommend > 0.7"]
from_date: 1/10/2021
to_date: 10/10/2021
conditions_values: [(AR, 0.6, true),()]
3
metrics
id,default_value,formula, type,reset_scope
id: total_accepted
default_value: 0,
formula: {type: python, value: 1}
4
metric_supplier
date, supplier_id, metric_id, value
supplier_id: 84987654
date: 1/10/2021
metric: total_accepted
value: 1

There are no rows in this table

Diagram

Screen Shot 2021-09-24 at 4.50.36 PM.png
We have 3 concepts with 2 configuration table: metric (table metric) > condition > mission (table mission). For example, mission A have condition {name: AR, formula: “(total_accept/total_recommend)”, val: “0.7”, type: “GT”} including 2 metrics: [total_accept, total_recommend]

Data Pipeline

Logical Step

find orders in final_state (completed/canceled)
for each order ⇒ find supplier_id + mission_supplier + metric_supplier
1. reset metrics [total_accepted(0), total_recommend(0), online(false), count_in_district_9(0)]
2. compute metrics: for each metric
⇒ compute metric: A by metabase_card(order_id) | python code | single value
update into metric_supplier (date, supplier_id, metric_id, value)
compute policy in policies
update into mission_supplier (supplier_id, mission_id, metrics[{}])

Sample Step

order ABCXYZ
⇒ supplier_id:84987654
⇒ having an active mission_supplier (mission_id=M)
⇒ mission M has metrics: [total_accepted, total_recommend]
1. reset_metrics [total_accepted(0), total_recommend(0)] on metric_for_supplier table
2. Calculate each metric:
⇒ metric total_accepted(formula = 1) ⇒ process value = 1 ⇒ total_accepted + 1
⇒ update into metric_supplier(date: 1/10/2021,supplier_id: 84987654,metric:total_accepted,value: 1)
(same for total_recommend)
⇒ calculate condition_values in mission_supplier:
⇒ mission M has condition {name: AR, formula: “(total_accept/total_recommend)”, val: “0.7”, type: “GT”}
⇒ update into mission_supplier: conditions_values [(AR, 0.6, true)]

Data schema

metric:
id: “total_rec”
description: “Số lượt tài xế được recommend + auto assign”
calculation: {
“ data_type”: “float | double precision”
“method”: “metabase” | “single_value”
“metabase_card_id” : // required for metabase method (required params: order_id)
“value”: 1 // required for single_value type
“default_value” : 0
}
metric_supplier
id: autogenerated
supplier_id:
metric_id:
date:
value:
condition_id: ???

Cron_Job (scheduled job) : airflow (python)

Scan all orders in current_date in final state (status = COMPLETED | CANCELED && supplier_id <> NULL && service_id not having ‘PARTNER’)
For each order and order.supplier_id
for each metric in metric table
get metric_supplier record by (supplier_id and metric_id and date=current_date)
- handle non_exist metric_supplier record
based on metric.calculation configuration, calculate metric_supplier.value
upsert metric_supplier.value into database
Estimate: 150k order x 10metric = 1.5tr rows per day
Apis:

Tasking

Create database tables in schema public (Công - team data)
Ops gửi metric list to insert into table metric (Nam + Công team data)
Build airflow job (Nghĩa with team data)
Run airflow job (Nghĩa with team data)
Ops verify metric_supplier data (Nam)
Nghĩa develop admin page to manage metrics

Context

Progress Tracker is a feature on driver app, showing available “missions” that drivers can accomplish to achieve rewards. These missions have common meta-data: some conditions during a period. A condition is a measurable metric must reach a target value, such as Acceptance Rate must > 70%.
Behind the scene, each driver mission is calculated by a single metabase card. And this approach causing a big issue: to calculate progress on a mission for a driver daily, we have to run through complicated query that consume all the data of the whole period, hence processing effort is enormous. Another trouble is different drivers have different period for the same mission, hence the current pipeline is more error-prone. So, the approach to measure mission progress by custom fragmented SQL code is not performance optimized and management-optimized.

Solution

We need to build a new processing engine that would help improving processing performance much better and reduce logic error.
To do so, we decide to rebuild a new data structure and data pipeline
Data Structure 2
0
Name
Structure
Sample
Temp
1
mission
(id,metrics, conditions)
id: P1
metrics: [total_accepted, total_recommend]
2
mission_supplier
(mission_id, supplier_id,from_date,to_date, conditions_values)
mission_id: P1
conditions: [AR: "total_accepted/total_recommend > 0.7"]
from_date: 1/10/2021
to_date: 10/10/2021
conditions_values: [(AR, 0.6, true),()]
3
metrics
id,default_value,formula, type,reset_scope
id: total_accepted
default_value: 0,
formula: {type: python, value: 1}
4
metric_supplier
date, supplier_id, metric_id, value
supplier_id: 84987654
date: 1/10/2021
metric: total_accepted
value: 1

There are no rows in this table

Diagram

Screen Shot 2021-09-24 at 4.50.36 PM.png
We have 3 concepts with 2 configuration table: metric (table metric) > condition > mission (table mission). For example, mission A have condition {name: AR, formula: “(total_accept/total_recommend)”, val: “0.7”, type: “GT”} including 2 metrics: [total_accept, total_recommend]

Data Pipeline

Logical Step

find orders in final_state (completed/canceled)
for each order ⇒ find supplier_id + mission_supplier + metric_supplier
1. reset metrics [total_accepted(0), total_recommend(0), online(false), count_in_district_9(0)]
2. compute metrics: for each metric
⇒ compute metric: A by metabase_card(order_id) | python code | single value
update into metric_supplier (date, supplier_id, metric_id, value)
compute mission in mission_supplier
update into mission_supplier (supplier_id, mission_id, metrics[{}])

Sample Step

order ABCXYZ
⇒ supplier_id:84987654
⇒ having an active mission_supplier (mission_id=M)
⇒ mission M has metrics: [total_accepted, total_recommend]
1. reset_metrics [total_accepted(0), total_recommend(0)] on metric_for_supplier table
2. Calculate each metric:
⇒ metric total_accepted(formula = 1) ⇒ process value = 1 ⇒ total_accepted + 1
⇒ update into metric_supplier(date: 1/10/2021,supplier_id: 84987654,metric:total_accepted,value: 1)
(same for total_recommend)
⇒ calculate condition_values in mission_supplier:
⇒ mission M has condition {name: AR, formula: “(total_accept/total_recommend)”, val: “0.7”, type: “GT”}
⇒ update into mission_supplier: conditions_values [(AR, 0.6, true)]

Data schema

metric:
id: “total_rec”
description: “Số lượt tài xế được recommend + auto assign”
calculation: {
“ data_type”: “float | double precision”
“method”: “metabase” | “single_value”
“metabase_card_id” : // required for metabase method (required params: order_id)
“value”: 1 // required for single_value type
“default_value” : 0
}
metric_supplier
id: autogenerated
supplier_id:
metric_id:
date:
value:
condition_id: ???

Cron_Job (scheduled job) : airflow (python)

Scan all orders in current_date in final state (status = COMPLETED | CANCELED && supplier_id <> NULL && service_id not having ‘PARTNER’)
For each order and order.supplier_id
for each metric in metric table
get metric_supplier record by (supplier_id and metric_id and date=current_date)
- handle non_exist metric_supplier record
based on metric.calculation configuration, calculate metric_supplier.value
upsert metric_supplier.value into database
Estimate: 150k order x 10metric = 1.5tr rows per day
Apis:

Tasking

[x] Create database tables in schema public (Công - team data)
[x] Ops gửi metric list to insert into table metric (Nam + Công team data)
[x] Build airflow job (Nghĩa with team data)
[x] Run airflow job (Nghĩa with team data)
[ ] Ops verify metric_supplier data (Nam)
[ ] Nghĩa develop admin page to manage metrics

select id, supplier_id from "order"
where status in ('COMPLETED','CANCELLED')
and order_time between %s and %s
and supplier_id is not null
and service_id not like ('%PARTNER%')
order by supplier_id ASC

Edge case:

1. rating:

event update order len 2 field: cancel_comment or rating_by_user where order_time < current_date => SOLUTION: use order-streaming-filter to handle
1. Nhân viên Aha xóa rating
2. Khách hàng đánh giá lại (cái này phụ thuộc vào Khách hàng có thể có hoặc ko)
Nên khi có event,
1. Xóa rating:
+) Bad_rating
Nếu rating_by_user cũ <5 -» -1
Nếu rating_by_user cũ =5 -» 0
+) Rating_point -» - rating_by_user cũ
+) Total_rating -» -1
2. Khách hàng đánh giá lại:
Các metric trigger update lại như flow ban đầu

2. complete/cancel order after order date (T+x):


Solution:

→ (1) pg trigger to write such event into a new table then process this table
rating_by_user where order_date < current_date
cancel_comment where order_date < current_date
3. order_status IN (COMPLETED, CANCELLED) where order_date < current_date
metric_event (id, type, order_id, data, time ) index by time ascending
RATING_REMOVAL, order_id, old_rating_by_user, time, supplier_id
RATING_UPDATE, order_id, new_rating_by_user, time, supplier_id
CANCEL_COMMENT_UPDATE, order_id, cancel_comment, time, supplier_id
STATUS_UPDATE, order_id, old_status, new_status, time, supplier_id
(2) airflow job to process metric_event daily:
RATING_REMOVAL: bad_rating, rating_point, total_rating
RATING_UPDATE: bad_rating, rating_point, total_rating
CANCEL_COMMENT: total_cancel
STATUS_UPDATE: total_order

Want to print your doc?
This is not the way.
Try clicking the ⋯ next to your doc name or using a keyboard shortcut (
CtrlP
) instead.