Skip to content

Commit b19cf06

Browse files
authored
Merge pull request #136 from meetwangdk/dk/tkp
Add the TKP feature item.
2 parents 6f2ce75 + 7ca8c4c commit b19cf06

File tree

18 files changed

+1080
-50
lines changed

18 files changed

+1080
-50
lines changed

cmd/grafanadi/main.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"flag"
5+
tkpReqProvider "github.com/alipay/container-observability-service/pkg/tkp_provider"
56
"os"
67
"os/signal"
78
"syscall"
@@ -25,7 +26,7 @@ var (
2526

2627
func newRootCmd() *cobra.Command {
2728
config := &server.ServerConfig{}
28-
var cfgFile, kubeConfigFile string
29+
var cfgFile, kubeConfigFile, tkpRefCfgFile string
2930

3031
cmd := &cobra.Command{
3132
Use: "grafanadi",
@@ -49,6 +50,12 @@ func newRootCmd() *cobra.Command {
4950
panic(err.Error())
5051
}
5152

53+
err = tkpReqProvider.InitTkpReqConfig(tkpRefCfgFile)
54+
if err != nil {
55+
klog.Errorf("failed to init tkp config [%s] err:%s", tkpRefCfgFile, err.Error())
56+
panic(err.Error())
57+
}
58+
5259
serverConfig := &server.ServerConfig{
5360
ListenAddr: config.ListenAddr,
5461
Storage: storage,
@@ -70,6 +77,7 @@ func newRootCmd() *cobra.Command {
7077
// for storage
7178
cmd.PersistentFlags().StringVarP(&cfgFile, "config-file", "", "/app/storage-config.yaml", "storage config file")
7279
cmd.PersistentFlags().StringVarP(&service.GrafanaUrl, "grafana-url", "", "", "grafana url")
80+
cmd.PersistentFlags().StringVarP(&tkpRefCfgFile, "tkp-req-config-file", "", "/app/tkp-req-config-file.json", "tkp req config file")
7381

7482
// kubeconfig for k8s client
7583
cmd.PersistentFlags().StringVarP(&kubeConfigFile, "kubeconfig", "", "/etc/kubernetes/kubeconfig/admin.kubeconfig", "Path to kubeconfig file with authorization and apiserver information.")

deploy/helm/lunettes/templates/grafanadi/grafanadi-cm.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ data:
88
endpoint: "http://es-cluster-svc.{{ .Values.namespace }}:9200"
99
username: {{ .Values.esUser }}
1010
password: {{ .Values.esPassword }}
11+
tkp-req-config-file.json: |
12+
{
13+
"staging":"http://alipay-tkp-manager.tkp.svc.cluster.local:9999"
14+
}
1115
kind: ConfigMap
1216
metadata:
1317
name: grafanadi-cm

deploy/helm/lunettes/templates/grafanadi/grafanadi-deploy.yaml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,9 @@ spec:
6868
name: logs
6969
- mountPath: /var/grafana
7070
name: grafana-pv
71-
- mountPath: /app/config-file.yaml
71+
- mountPath: /app
7272
name: cm-vol
7373
readOnly: true
74-
subPath: config-file.yaml
7574
volumes:
7675
- name: grafana-pv
7776
hostPath:
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
package handler
2+
3+
import (
4+
"context"
5+
"fmt"
6+
eavesmodel "github.com/alipay/container-observability-service/internal/grafanadi/model"
7+
"github.com/alipay/container-observability-service/internal/grafanadi/service"
8+
interutils "github.com/alipay/container-observability-service/internal/grafanadi/utils"
9+
"github.com/alipay/container-observability-service/pkg/dal/storage-client/data_access"
10+
"github.com/alipay/container-observability-service/pkg/dal/storage-client/model"
11+
"github.com/alipay/container-observability-service/pkg/metrics"
12+
"github.com/alipay/container-observability-service/pkg/utils"
13+
"github.com/olivere/elastic/v7"
14+
"k8s.io/klog/v2"
15+
"net/http"
16+
"time"
17+
)
18+
19+
type OwnerPodMapHandler struct {
20+
request *http.Request
21+
writer http.ResponseWriter
22+
requestParams *OwnerPodMapParams
23+
storage data_access.StorageInterface
24+
}
25+
26+
func (handler *OwnerPodMapHandler) GetOwnerPodMap(debugfrom, key, value string) (int, interface{}, error) {
27+
sloTraceData := make([]*model.SloTraceData, 0)
28+
result := []model.SloTraceData{}
29+
podYamls := make([]*model.PodYaml, 0)
30+
begin := time.Now()
31+
defer func() {
32+
cost := utils.TimeSinceInMilliSeconds(begin)
33+
metrics.QueryMethodDurationMilliSeconds.WithLabelValues("GetOwnerPodMap").Observe(cost)
34+
}()
35+
if debugfrom == "pod" {
36+
// get owneref pod with pod key/value
37+
util := interutils.Util{
38+
Storage: handler.storage,
39+
}
40+
py, err := util.GetPodYaml(podYamls, key, value)
41+
if err != nil || len(py) == 0 {
42+
return http.StatusOK, eavesmodel.DataFrame{}, err
43+
}
44+
if py[0].Pod == nil {
45+
return http.StatusOK, eavesmodel.DataFrame{}, err
46+
}
47+
48+
if len(py[0].Pod.OwnerReferences) != 0 {
49+
or := py[0].Pod.OwnerReferences[0]
50+
value = string(or.UID)
51+
}
52+
} else {
53+
switch key {
54+
case "name":
55+
uid, err := findUniqueId(value, handler.storage)
56+
klog.Info("uid is %s", uid)
57+
if err != nil {
58+
klog.Errorf("findUniqueId error, error is %s", err)
59+
return http.StatusOK, eavesmodel.DataFrame{}, err
60+
}
61+
value = uid
62+
default:
63+
fmt.Println("currently only supports uid or name")
64+
return http.StatusOK, eavesmodel.DataFrame{}, nil
65+
}
66+
}
67+
if value == "" {
68+
return http.StatusOK, eavesmodel.DataFrame{}, nil
69+
}
70+
err := handler.storage.QuerySloTraceDataWithOwnerId(&sloTraceData, value,
71+
model.WithFrom(handler.requestParams.From),
72+
model.WithTo(handler.requestParams.To),
73+
model.WithLimit(1000))
74+
if err != nil {
75+
return http.StatusOK, eavesmodel.DataFrame{}, fmt.Errorf("QuerySloTraceDataWithOwnerId error, error is %s", err)
76+
}
77+
for _, std := range sloTraceData {
78+
if std.Type == "create" || std.Type == "delete" {
79+
found := false
80+
for i, pod := range result {
81+
if pod.PodUID == std.PodUID {
82+
if std.Type == "create" {
83+
result[i].CreatedTime = std.CreatedTime
84+
result[i].OwnerRefStr = std.OwnerRefStr
85+
if std.RunningAt.After(std.ReadyAt) {
86+
std.ReadyAt = std.RunningAt
87+
}
88+
result[i].ReadyAt = std.ReadyAt
89+
result[i].SLOViolationReason = std.SLOViolationReason
90+
} else {
91+
result[i].DeletedTime = std.CreatedTime
92+
result[i].DeleteEndTime = std.DeleteEndTime
93+
result[i].DeleteResult = std.DeleteResult
94+
}
95+
found = true
96+
}
97+
}
98+
if !found {
99+
if std.RunningAt.After(std.ReadyAt) {
100+
std.ReadyAt = std.RunningAt
101+
}
102+
if std.Type == "delete" {
103+
std.DeletedTime = std.CreatedTime
104+
}
105+
result = append(result, *std)
106+
}
107+
}
108+
}
109+
return http.StatusOK, service.ConvertSloDataTrace2Graph(result), nil
110+
}
111+
112+
type OwnerPodMapParams struct {
113+
Key string
114+
Value string
115+
DebugFrom string
116+
117+
From time.Time // range query
118+
To time.Time // range query
119+
120+
}
121+
122+
func (handler *OwnerPodMapHandler) RequestParams() interface{} {
123+
return handler.requestParams
124+
}
125+
126+
func (handler *OwnerPodMapHandler) ParseRequest() error {
127+
params := OwnerPodMapParams{}
128+
if handler.request.Method == http.MethodGet {
129+
key := handler.request.URL.Query().Get("searchkey")
130+
value := handler.request.URL.Query().Get("searchvalue")
131+
debugfrom := handler.request.URL.Query().Get("debugfrom")
132+
params.Key = key
133+
params.Value = value
134+
params.DebugFrom = debugfrom
135+
136+
setTPLayout(handler.request.URL.Query(), "from", &params.From)
137+
setTPLayout(handler.request.URL.Query(), "to", &params.To)
138+
}
139+
140+
handler.requestParams = &params
141+
return nil
142+
}
143+
144+
func (handler *OwnerPodMapHandler) ValidRequest() error {
145+
146+
return nil
147+
}
148+
149+
func OwnerPodMapFactory(w http.ResponseWriter, r *http.Request, storage data_access.StorageInterface) Handler {
150+
return &OwnerPodMapHandler{
151+
request: r,
152+
writer: w,
153+
storage: storage,
154+
}
155+
}
156+
157+
func (handler *OwnerPodMapHandler) Process() (int, interface{}, error) {
158+
defer utils.IgnorePanic("ContainerlifecycleHandler.Process ")
159+
160+
var result interface{}
161+
var err error
162+
var httpStatus int
163+
164+
httpStatus, result, err = handler.GetOwnerPodMap(handler.requestParams.DebugFrom, handler.requestParams.Key, handler.requestParams.Value)
165+
166+
return httpStatus, result, err
167+
}
168+
169+
func findUniqueId(workloadName string, storage data_access.StorageInterface) (uid string, err error) {
170+
esClient, ok := storage.(*data_access.StorageEsImpl)
171+
if !ok {
172+
err = fmt.Errorf("parse errror")
173+
return
174+
}
175+
query := elastic.NewBoolQuery().
176+
Must(
177+
elastic.NewTermQuery("ExtraProperties.ownerref.name.Value.keyword", workloadName),
178+
elastic.NewExistsQuery("ExtraProperties.ownerref.uid.Value.keyword"),
179+
elastic.NewExistsQuery("ExtraProperties.ownerref.name.Value.keyword"),
180+
)
181+
aggs := elastic.NewTermsAggregation().
182+
Field("ExtraProperties.ownerref.name.Value.keyword").
183+
SubAggregation("group_by_ownerref_uid", elastic.NewTermsAggregation().Field("ExtraProperties.ownerref.uid.Value.keyword"))
184+
185+
searchResult, err := esClient.DB.Search().
186+
Index("slo_trace_data_daily").
187+
Query(query).
188+
Size(0).
189+
Aggregation("group_by_ownerref_name", aggs).
190+
Do(context.Background())
191+
if err != nil {
192+
err = fmt.Errorf("failed to execute search query: %v", err)
193+
klog.Errorf("Failed to execute search query: %v", err)
194+
return
195+
}
196+
197+
if agg, found := searchResult.Aggregations.Terms("group_by_ownerref_name"); found {
198+
for _, bucket := range agg.Buckets {
199+
if uidAgg, uidFound := bucket.Aggregations.Terms("group_by_ownerref_uid"); uidFound {
200+
for _, detail := range uidAgg.Buckets {
201+
if strKey, ok := detail.Key.(string); ok {
202+
return strKey, nil
203+
} else {
204+
return "", fmt.Errorf("workload uid key is not a string")
205+
}
206+
}
207+
}
208+
break
209+
}
210+
} else {
211+
klog.Infof("No aggs aggregation found")
212+
}
213+
return
214+
}

0 commit comments

Comments
 (0)