Skip to content
This repository was archived by the owner on Aug 8, 2025. It is now read-only.

Commit 94fffcc

Browse files
committed
Update all plans to use connection builders, flatten structure to make it clear what data source is used
1 parent e2d3318 commit 94fffcc

23 files changed

Lines changed: 296 additions & 117 deletions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ docker/data/custom/generated
3131
docker/data/custom/recordTracking
3232
docker/data/custom/report
3333
docker/sample
34+
docker/tmp
3435

3536
benchmark/jars
3637

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
FROM datacatering/data-caterer:0.13.1
1+
FROM datacatering/data-caterer:0.14.0
22

33
COPY --chown=app:app build/libs/data-caterer-example-0.1.0.jar /opt/app/job.jar
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
name: "simple_http"
2+
steps:
3+
- name: "account"
4+
count:
5+
records: 50
6+
fields:
7+
- name: "httpUrl"
8+
fields:
9+
- name: "url"
10+
static: "http://localhost:80/anything/{id}"
11+
- name: "method"
12+
static: "PUT"
13+
- name: "pathParam"
14+
fields:
15+
- name: "id"
16+
options:
17+
sql: "bodyContent.account_id"
18+
- name: "queryParam"
19+
fields:
20+
- name: "limit"
21+
type: "integer"
22+
options:
23+
oneOf:
24+
- 10
25+
- 5
26+
- name: "httpHeaders"
27+
fields:
28+
- name: "Content-Type"
29+
static: "application/json"
30+
- name: "X-Account-Id"
31+
options:
32+
sql: "bodyContent.account_id"
33+
- name: "X-Updated"
34+
type: "timestamp"
35+
options:
36+
sql: "bodyContent.details.updated_by.time"
37+
- name: "httpBody"
38+
fields:
39+
- name: "account_id"
40+
options:
41+
regex: "ACC[0-9]{8}"
42+
- name: "year"
43+
type: "int"
44+
options:
45+
min: 2021
46+
max: 2022
47+
- name: "amount"
48+
type: "double"
49+
options:
50+
min: 10.0
51+
max: 100.0
52+
- name: "details"
53+
fields:
54+
- name: "name"
55+
- name: "txn_date"
56+
type: "date"
57+
options:
58+
min: "2021-01-01"
59+
max: "2021-12-31"
60+
- name: "updated_by"
61+
fields:
62+
- name: "user"
63+
- name: "time"
64+
type: "timestamp"
65+
- name: "transactions"
66+
type: "array"
67+
fields:
68+
- name: "txn_date"
69+
type: "date"
70+
- name: "amount"
71+
type: "double"
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
name: "simple_kafka"
2+
steps:
3+
- name: "kafka_account"
4+
type: "json"
5+
count:
6+
records: "10"
7+
options:
8+
topic: "account-topic"
9+
fields:
10+
- name: "key"
11+
type: "string"
12+
options:
13+
sql: "content.account_id"
14+
- name: "messageBody"
15+
fields:
16+
- name: "account_id"
17+
- name: "year"
18+
type: "int"
19+
options:
20+
min: "2021"
21+
max: "2022"
22+
- name: "amount"
23+
type: "double"
24+
options:
25+
min: "10.0"
26+
max: "100.0"
27+
- name: "details"
28+
fields:
29+
- name: "name"
30+
- name: "txn_date"
31+
type: "date"
32+
options:
33+
min: "2021-01-01"
34+
max: "2021-12-31"
35+
- name: "updated_by"
36+
fields:
37+
- name: "user"
38+
- name: "time"
39+
type: "timestamp"
40+
- name: "transactions"
41+
type: "array"
42+
fields:
43+
- name: "txn_date"
44+
type: "date"
45+
- name: "amount"
46+
type: "double"
47+
- name: "messageHeaders"
48+
fields:
49+
- name: "account-id"
50+
options:
51+
sql: "content.account_id"
52+
- name: "updated"
53+
options:
54+
sql: "content.details.update_by.time"

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ version=0.1.0
88

99
scalaVersion=2.12
1010
scalaSpecificVersion=2.12.19
11-
dataCatererVersion=0.13.1
11+
dataCatererVersion=0.14.1
1212
sparkMajorVersion=3.5

helm/data-caterer/values.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ image:
88
repository: "datacatering/data-caterer"
99
pullPolicy: "IfNotPresent"
1010
# Overrides the image tag whose default is the chart appVersion.
11-
tag: "0.13.1"
11+
tag: "0.14.0"
1212

1313
imagePullSecrets: []
1414
nameOverride: ""

run.sh

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,20 @@
11
#!/usr/bin/env bash
22

33
data_caterer_version=$(grep dataCatererVersion gradle.properties | cut -d= -f2)
4+
data_caterer_user=${DATA_CATERER_API_USER:-}
5+
data_caterer_token=${DATA_CATERER_API_TOKEN:-}
6+
7+
echo "Checking for Data Caterer user and token..."
8+
if [[ -z ${DATA_CATERER_API_USER} ]]; then
9+
read -p "Data Caterer user: " data_caterer_user
10+
DATA_CATERER_API_USER=data_caterer_user
11+
fi
12+
if [[ -z ${DATA_CATERER_API_TOKEN} ]]; then
13+
read -p "Data Caterer token: " -s data_caterer_token
14+
DATA_CATERER_API_TOKEN=data_caterer_token
15+
echo
16+
fi
17+
418
if [[ -s ".tmp_prev_class_name" ]]; then
519
prev_class_name=$(cat .tmp_prev_class_name)
620
else
@@ -38,11 +52,14 @@ DOCKER_CMD=(
3852
-v "$(pwd)/docker/sample/tracking:/opt/app/record-tracking"
3953
-v "$(pwd)/docker/mount:/opt/app/mount"
4054
-v "$(pwd)/docker/data/custom:/opt/app/custom"
55+
-v "$(pwd)/docker/tmp:/tmp"
4156
-e "APPLICATION_CONFIG_PATH=/opt/app/custom/application.conf"
4257
-e "$full_class_name"
4358
-e "DEPLOY_MODE=client"
4459
-e "DRIVER_MEMORY=2g"
4560
-e "EXECUTOR_MEMORY=2g"
61+
-e "DATA_CATERER_API_USER=$data_caterer_user"
62+
-e "DATA_CATERER_API_TOKEN=$data_caterer_token"
4663
--network "docker_default"
4764
datacatering/data-caterer:"$data_caterer_version"
4865
)

src/main/scala/io/github/datacatering/plan/AdvancedBatchEventPlanRun.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import io.github.datacatering.datacaterer.api.PlanRun
44

55
class AdvancedBatchEventPlanRun extends PlanRun {
66

7-
val kafkaTask = new AdvancedKafkaPlanRun().kafkaTask
7+
val kafkaTask = new KafkaPlanRun().kafkaTask
88

99
val csvTask = csv("my_csv", "/opt/app/data/csv/account")
1010
.fields(

src/main/scala/io/github/datacatering/plan/AdvancedKafkaPlanRun.scala

Lines changed: 0 additions & 52 deletions
This file was deleted.

src/main/scala/io/github/datacatering/plan/AdvancedPlanRun.scala

Lines changed: 35 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -11,63 +11,48 @@ class AdvancedPlanRun extends PlanRun {
1111
val accountIdField = field.name("account_id").regex("ACC[0-9]{8}")
1212
val nameField = field.name("name").expression("#{Name.name}")
1313

14-
val postgresTask = task.name("customer_postgres")
15-
.steps(
16-
step
17-
.name("transaction")
18-
.jdbcTable("account.transaction")
19-
.fields(
20-
accountIdField,
21-
field.name("txn_id").regex("txn_[0-9]{5}"),
22-
field.name("year").`type`(IntegerType).sql("YEAR(date)"),
23-
nameField,
24-
field.name("date").`type`(DateType).min(startDate),
25-
field.name("amount").`type`(DoubleType).max(10000),
26-
field.name("credit_debit").sql("CASE WHEN amount < 0 THEN 'C' ELSE 'D' END"),
27-
)
28-
.count(
29-
count
30-
.recordsPerFieldGenerator(generator.min(1).max(10), "account_id")
31-
)
32-
,
33-
step
34-
.name("account")
35-
.jdbcTable("account.account")
36-
.fields(
37-
accountIdField,
38-
nameField,
39-
field.name("open_date").`type`(DateType).min(startDate),
40-
field.name("status").oneOf("open", "closed", "pending")
41-
)
42-
.count(10)
14+
val postgresTxn = postgres("customer_postgres_txn")
15+
.table("account.transactions")
16+
.fields(
17+
accountIdField,
18+
field.name("txn_id").regex("txn_[0-9]{5}"),
19+
field.name("year").`type`(IntegerType).sql("YEAR(date)"),
20+
nameField,
21+
field.name("date").`type`(DateType).min(startDate),
22+
field.name("amount").`type`(DoubleType).max(10000),
23+
field.name("credit_debit").sql("CASE WHEN amount < 0 THEN 'C' ELSE 'D' END"),
24+
)
25+
.count(
26+
count
27+
.recordsPerFieldGenerator(generator.min(1).max(10), "account_id")
4328
)
29+
val postgresAccount = postgres("customer_postgres_account")
30+
.table("account.account")
31+
.fields(
32+
accountIdField,
33+
nameField,
34+
field.name("open_date").`type`(DateType).min(startDate),
35+
field.name("status").oneOf("open", "closed", "pending")
36+
)
37+
.count(count.records(10))
4438

45-
val jsonTask = task.name("account_json")
46-
.steps(
47-
step
48-
.name("account_info")
49-
.path("src/main/resources/sample/json")
39+
val jsonTask = json("account_json", "src/main/resources/sample/json")
40+
.fields(
41+
accountIdField,
42+
nameField,
43+
field.name("txn_list")
44+
.`type`(ArrayType)
5045
.fields(
51-
accountIdField,
52-
nameField,
53-
field.name("txn_list")
54-
.`type`(ArrayType)
55-
.fields(
56-
field.name("id").sql("_holding_txn_id"),
57-
field.name("date").`type`(DateType).min(startDate),
58-
field.name("amount").`type`(DoubleType),
59-
),
60-
field.name("_holding_txn_id").omit(true)
61-
)
46+
field.name("id").sql("_holding_txn_id"),
47+
field.name("date").`type`(DateType).min(startDate),
48+
field.name("amount").`type`(DoubleType),
49+
),
50+
field.name("_holding_txn_id").omit(true)
6251
)
6352

6453
val conf = configuration.postgres("my_postgres")
6554

6655
val accountPlan = plan.name("Create accounts and transactions across Postgres and JSON file")
67-
.taskSummaries(
68-
taskSummary.dataSource("my_postgres").task(postgresTask),
69-
taskSummary.dataSource("my_json").task(jsonTask)
70-
)
7156
.addForeignKeyRelationship(
7257
foreignField("my_postgres", "transaction", "txn_id"),
7358
foreignField("my_json", "account_info", "_holding_txn_id")
@@ -83,5 +68,5 @@ class AdvancedPlanRun extends PlanRun {
8368
foreignField("my_json", "account_info", "name"),
8469
)
8570

86-
execute(accountPlan, conf)
71+
execute(accountPlan, conf, postgresAccount, postgresTxn, jsonTask)
8772
}

0 commit comments

Comments
 (0)