Skip to content

Commit c1bdfbb

Browse files
committed
feat: optimus support maxcompute
1 parent 2f73a1f commit c1bdfbb

File tree

7 files changed

+811
-175
lines changed

7 files changed

+811
-175
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ config.yaml
2626
meteor
2727
_recipes
2828
meteor.yaml
29+
vendor
2930

3031
# plugins
3132
meteor-plugin-*

.gitignore.bak

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Binaries for programs and plugins
2+
*.exe
3+
*.exe~
4+
*.dll
5+
*.so
6+
*.dylib
7+
8+
.DS_Store
9+
10+
# Test binary, built with `go test -c`
11+
*.test
12+
13+
# Output of the go coverage tool, specifically when used with LiteIDE
14+
*.out
15+
16+
# Dependency directories (remove the comment below to include it)
17+
# vendor/
18+
19+
# IDEs
20+
.idea
21+
.vscode
22+
23+
# Project specific ignore
24+
.env
25+
config.yaml
26+
meteor
27+
_recipes
28+
meteor.yaml
29+
30+
# plugins
31+
meteor-plugin-*
32+
33+
# build
34+
/dist
35+
36+
.playground
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
package optimus
2+
3+
import (
4+
"context"
5+
_ "embed" // used to print the embedded assets
6+
"errors"
7+
"fmt"
8+
"strings"
9+
10+
"github.com/raystack/meteor/models"
11+
v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2"
12+
"github.com/raystack/meteor/plugins"
13+
"github.com/raystack/meteor/plugins/extractors/optimus/client"
14+
"github.com/raystack/meteor/registry"
15+
"github.com/raystack/meteor/utils"
16+
pb "github.com/raystack/optimus/protos/raystack/optimus/core/v1beta1"
17+
"github.com/raystack/salt/log"
18+
"google.golang.org/protobuf/types/known/anypb"
19+
)
20+
21+
const (
22+
service = "optimus"
23+
sampleConfig = `host: optimus.com:80`
24+
prefixBigQuery = "bigquery://"
25+
prefixMaxcompute = "maxcompute://"
26+
27+
var errInvalidDependency = errors.New("invalid dependency")
28+
)
29+
30+
// Register the extractor to catalog
31+
func init() {
32+
if err := registry.Extractors.Register("optimus", func() plugins.Extractor {
33+
return New(plugins.GetLog(), client.New())
34+
}); err != nil {
35+
panic(err)
36+
}
37+
}
38+
39+
//go:embed README.md
40+
var summary string
41+
42+
// Config holds the set of configuration for the bigquery extractor
43+
type Config struct {
44+
Host string `json:"host" yaml:"host" mapstructure:"host" validate:"required"`
45+
MaxSizeInMB int `json:"max_size_in_mb" yaml:"max_size_in_mb" mapstructure:"max_size_in_mb"`
46+
}
47+
48+
var info = plugins.Info{
49+
Description: "Optimus' jobs metadata",
50+
SampleConfig: sampleConfig,
51+
Summary: summary,
52+
Tags: []string{"optimus", "bigquery"},
53+
}
54+
55+
// Extractor manages the communication with the bigquery service
56+
type Extractor struct {
57+
plugins.BaseExtractor
58+
logger log.Logger
59+
config Config
60+
client client.Client
61+
}
62+
63+
func New(l log.Logger, c client.Client) *Extractor {
64+
e := &Extractor{
65+
logger: l,
66+
client: c,
67+
}
68+
e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config)
69+
70+
return e
71+
}
72+
73+
// Init initializes the extractor
74+
func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
75+
if err := e.BaseExtractor.Init(ctx, config); err != nil {
76+
return err
77+
}
78+
79+
if err := e.client.Connect(ctx, e.config.Host, e.config.MaxSizeInMB); err != nil {
80+
return fmt.Errorf("connect to host: %w", err)
81+
}
82+
83+
return nil
84+
}
85+
86+
// Extract checks if the table is valid and extracts the table schema
87+
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
88+
defer e.client.Close()
89+
90+
projResp, err := e.client.ListProjects(ctx, &pb.ListProjectsRequest{})
91+
if err != nil {
92+
return fmt.Errorf("fetch projects: %w", err)
93+
}
94+
95+
for _, project := range projResp.Projects {
96+
nspaceResp, err := e.client.ListProjectNamespaces(ctx, &pb.ListProjectNamespacesRequest{
97+
ProjectName: project.Name,
98+
})
99+
if err != nil {
100+
e.logger.Error("error fetching namespace list", "project", project.Name, "err", err)
101+
continue
102+
}
103+
104+
for _, namespace := range nspaceResp.Namespaces {
105+
jobResp, err := e.client.ListJobSpecification(ctx, &pb.ListJobSpecificationRequest{
106+
ProjectName: project.Name,
107+
NamespaceName: namespace.Name,
108+
})
109+
if err != nil {
110+
e.logger.Error("error fetching job list", "err", err, "project", project.Name, "namespace", namespace.Name)
111+
continue
112+
}
113+
114+
for _, job := range jobResp.Jobs {
115+
data, err := e.buildJob(ctx, job, project.Name, namespace.Name)
116+
if err != nil {
117+
e.logger.Error(
118+
"error building job model",
119+
"err", err,
120+
"project", project.Name,
121+
"namespace", namespace.Name,
122+
"job", job.Name)
123+
continue
124+
}
125+
126+
emit(models.NewRecord(data))
127+
}
128+
}
129+
}
130+
131+
return nil
132+
}
133+
134+
func (e *Extractor) buildJob(ctx context.Context, jobSpec *pb.JobSpecification, project, namespace string) (*v1beta2.Asset, error) {
135+
jobResp, err := e.client.GetJobTask(ctx, &pb.GetJobTaskRequest{
136+
ProjectName: project,
137+
NamespaceName: namespace,
138+
JobName: jobSpec.Name,
139+
})
140+
if err != nil {
141+
return nil, fmt.Errorf("fetching task: %w", err)
142+
}
143+
144+
task := jobResp.Task
145+
upstreams, downstreams, err := e.buildLineage(task)
146+
if err != nil {
147+
return nil, fmt.Errorf("building lineage: %w", err)
148+
}
149+
150+
jobID := fmt.Sprintf("%s.%s.%s", project, namespace, jobSpec.Name)
151+
urn := models.NewURN(service, e.UrnScope, "job", jobID)
152+
153+
jobModel, err := anypb.New(&v1beta2.Job{
154+
Attributes: utils.TryParseMapToProto(map[string]interface{}{
155+
"version": jobSpec.Version,
156+
"project": project,
157+
"namespace": namespace,
158+
"owner": jobSpec.Owner,
159+
"startDate": strOrNil(jobSpec.StartDate),
160+
"endDate": strOrNil(jobSpec.EndDate),
161+
"interval": jobSpec.Interval,
162+
"dependsOnPast": jobSpec.DependsOnPast,
163+
"catchUp": jobSpec.CatchUp,
164+
"taskName": jobSpec.TaskName,
165+
"windowSize": jobSpec.WindowSize,
166+
"windowOffset": jobSpec.WindowOffset,
167+
"windowTruncateTo": jobSpec.WindowTruncateTo,
168+
"sql": jobSpec.Assets["query.sql"],
169+
"task": map[string]interface{}{
170+
"name": task.Name,
171+
"description": task.Description,
172+
"image": task.Image,
173+
},
174+
}),
175+
})
176+
if err != nil {
177+
return nil, fmt.Errorf("create Any struct: %w", err)
178+
}
179+
180+
return &v1beta2.Asset{
181+
Urn: urn,
182+
Name: jobSpec.Name,
183+
Service: service,
184+
Description: jobSpec.Description,
185+
Type: "job",
186+
Data: jobModel,
187+
Owners: []*v1beta2.Owner{
188+
{
189+
Urn: jobSpec.Owner,
190+
Email: jobSpec.Owner,
191+
},
192+
},
193+
Lineage: &v1beta2.Lineage{
194+
Upstreams: upstreams,
195+
Downstreams: downstreams,
196+
},
197+
}, nil
198+
}
199+
200+
func (e *Extractor) buildLineage(task *pb.JobTask) (upstreams, downstreams []*v1beta2.Resource, err error) {
201+
upstreams, err = e.buildUpstreams(task)
202+
if err != nil {
203+
return nil, nil, fmt.Errorf("build upstreams: %w", err)
204+
}
205+
206+
downstreams, err = e.buildDownstreams(task)
207+
if err != nil {
208+
return nil, nil, fmt.Errorf("build downstreams: %w", err)
209+
}
210+
211+
return upstreams, downstreams, nil
212+
}
213+
214+
func (e *Extractor) buildUpstreams(task *pb.JobTask) ([]*v1beta2.Resource, error) {
215+
var upstreams []*v1beta2.Resource
216+
for _, dependency := range task.Dependencies {
217+
urn, err := plugins.BigQueryTableFQNToURN(
218+
strings.TrimPrefix(dependency.Dependency, "bigquery://"),
219+
)
220+
if err != nil {
221+
return nil, err
222+
}
223+
224+
upstreams = append(upstreams, &v1beta2.Resource{
225+
Urn: urn,
226+
Type: "table",
227+
Service: "bigquery",
228+
})
229+
}
230+
231+
return upstreams, nil
232+
}
233+
234+
func (e *Extractor) buildDownstreams(task *pb.JobTask) ([]*v1beta2.Resource, error) {
235+
if task.Destination == nil || task.Destination.Destination == "" {
236+
return nil, nil
237+
}
238+
239+
urn, err := plugins.BigQueryTableFQNToURN(
240+
strings.TrimPrefix(task.Destination.Destination, "bigquery://"),
241+
)
242+
if err != nil {
243+
return nil, err
244+
}
245+
246+
return []*v1beta2.Resource{{
247+
Urn: urn,
248+
Type: "table",
249+
Service: "bigquery",
250+
}}, nil
251+
}
252+
253+
func strOrNil(s string) interface{} {
254+
if s == "" {
255+
return nil
256+
}
257+
258+
return s
259+
}

0 commit comments

Comments
 (0)