Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/driver/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ type ChunkReader interface {
Next() (io.Reader, int64, error)
}

// EmptyChunkReader is a ChunkReader implementation that contains no chunks.
// Its Next method always returns io.EOF, indicating that no data is available.
type EmptyChunkReader struct{}

// Next always returns io.EOF because there are no chunks.
func (EmptyChunkReader) Next() (io.Reader, int64, error) {
return nil, 0, io.EOF
}

type BigData interface {
Get(index int) ([]byte, error)
Reader() (ChunkReader, error)
Expand Down
5 changes: 5 additions & 0 deletions pkg/driver/device_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ type DeviceManager interface {
GetMetricsReader(u uuid.UUID) (common.ChunkReader, error)
// GetRequestsReader get the request logs for a given uuid
GetRequestsReader(u uuid.UUID) (common.ChunkReader, error)
// GetAppLogsReader returns a logs reader for the specified application
// on the given device.
GetAppLogsReader(devID, appID uuid.UUID) (common.ChunkReader, error)
// GetFlowMessageReader returns a flow-message reader for the specified device.
GetFlowMessageReader(devID uuid.UUID) (common.ChunkReader, error)
// GetCerts retrieve the attest certs for a particular device
GetCerts(uid uuid.UUID) ([]byte, error)
// GetStorageKeys retrieve storage keys for a particular device
Expand Down
40 changes: 36 additions & 4 deletions pkg/driver/file/device_manager_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ func (d *DeviceManager) initDevice(u uuid.UUID) error {
}

// create the necessary directories for data uploads
for _, p := range []string{logDir, metricsDir, infoDir, requestsDir} {
for _, p := range []string{logDir, metricsDir, infoDir, requestsDir, flowMessageDir} {
cur := path.Join(devicePath, p)
err = os.MkdirAll(cur, 0755)
if err != nil {
Expand Down Expand Up @@ -740,6 +740,9 @@ func (d *DeviceManager) appExists(u, instanceID uuid.UUID) bool {
if _, ok := d.devices[u]; !ok {
return false
}
if _, ok := d.devices[u].AppLogs[instanceID]; !ok {
return false
}
return true
}

Expand All @@ -755,8 +758,12 @@ func (d *DeviceManager) WriteAppInstanceLogs(instanceID uuid.UUID, deviceID uuid
return fmt.Errorf("unregistered device UUID: %s", deviceID)
}
if !d.appExists(deviceID, instanceID) {
appDir := d.getAppPath(deviceID, instanceID)
if err := os.MkdirAll(appDir, 0755); err != nil {
return fmt.Errorf("failed to create app log directory %s: %v", appDir, err)
}
d.devices[deviceID].AppLogs[instanceID] = &ManagedFile{
dir: d.getAppPath(deviceID, instanceID),
dir: appDir,
maxSize: int64(d.maxAppLogsSize),
}
}
Expand Down Expand Up @@ -1084,9 +1091,9 @@ func (d *DeviceManager) getOnboardPath(cn string) string {
return path.Join(d.databasePath, onboardDir, cn)
}

func openTimestampFile(filename string) (*os.File, error) {
func openTimestampFile(dirname string) (*os.File, error) {
// open a new one
fullPath := path.Join(filename, time.Now().Format("2006-01-02T15:04:05.111"))
fullPath := path.Join(dirname, fmt.Sprintf("%020d", time.Now().UnixNano()))
return os.Create(fullPath)
}

Expand Down Expand Up @@ -1222,6 +1229,31 @@ func (d *DeviceManager) GetRequestsReader(u uuid.UUID) (common.ChunkReader, erro
return dev.Requests.Reader()
}

// GetAppLogsReader returns a logs reader for the specified application
// on the given device.
func (d *DeviceManager) GetAppLogsReader(devID, appID uuid.UUID) (common.ChunkReader, error) {
d.m.Lock()
defer d.m.Unlock()
if !d.deviceExists(devID) {
return nil, fmt.Errorf("unregistered device UUID: %s", devID)
}
if !d.appExists(devID, appID) {
return common.EmptyChunkReader{}, nil
}
return d.devices[devID].AppLogs[appID].Reader()
}

// GetFlowMessageReader returns a flow-message reader for the specified device.
func (d *DeviceManager) GetFlowMessageReader(devID uuid.UUID) (common.ChunkReader, error) {
d.m.Lock()
defer d.m.Unlock()
// check that the device actually exists
if !d.deviceExists(devID) {
return nil, fmt.Errorf("unregistered device UUID: %s", devID)
}
return d.devices[devID].FlowMessage.Reader()
}

// WriteFlowMessage write FlowMessage
func (d *DeviceManager) WriteFlowMessage(u uuid.UUID, b []byte) error {
// make sure it is not nil
Expand Down
26 changes: 26 additions & 0 deletions pkg/driver/memory/device_manager_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,32 @@ func (d *DeviceManager) GetRequestsReader(u uuid.UUID) (common.ChunkReader, erro
return dev.Requests.Reader()
}

// GetAppLogsReader returns a logs reader for the specified application
// on the given device.
func (d *DeviceManager) GetAppLogsReader(devID, appID uuid.UUID) (common.ChunkReader, error) {
d.m.Lock()
defer d.m.Unlock()
dev, ok := d.devices[devID]
if !ok {
return nil, fmt.Errorf("unregistered device UUID %s", devID.String())
}
if !d.appExists(devID, appID) {
return common.EmptyChunkReader{}, nil
}
return dev.AppLogs[appID].Reader()
}

// GetFlowMessageReader returns a flow-message reader for the specified device.
func (d *DeviceManager) GetFlowMessageReader(devID uuid.UUID) (common.ChunkReader, error) {
d.m.Lock()
defer d.m.Unlock()
dev, ok := d.devices[devID]
if !ok {
return nil, fmt.Errorf("unregistered device UUID %s", devID.String())
}
return dev.FlowMessage.Reader()
}

// WriteFlowMessage write FlowMessage
func (d *DeviceManager) WriteFlowMessage(u uuid.UUID, b []byte) error {
// make sure it is not nil
Expand Down
26 changes: 26 additions & 0 deletions pkg/driver/redis/device_manager_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,32 @@ func (d *DeviceManager) GetRequestsReader(u uuid.UUID) (common.ChunkReader, erro
return dev.Requests.Reader()
}

// GetAppLogsReader returns a logs reader for the specified application
// on the given device.
func (d *DeviceManager) GetAppLogsReader(devID, appID uuid.UUID) (common.ChunkReader, error) {
d.m.Lock()
defer d.m.Unlock()
dev, ok := d.devices[devID]
if !ok {
return nil, fmt.Errorf("unregistered device UUID %s", devID.String())
}
if !d.appExists(devID, appID) {
return common.EmptyChunkReader{}, nil
}
return dev.AppLogs[appID].Reader()
}

// GetFlowMessageReader returns a flow-message reader for the specified device.
func (d *DeviceManager) GetFlowMessageReader(devID uuid.UUID) (common.ChunkReader, error) {
d.m.Lock()
defer d.m.Unlock()
dev, ok := d.devices[devID]
if !ok {
return nil, fmt.Errorf("unregistered device UUID: %s", devID)
}
return dev.FlowMessage.Reader()
}

// refreshCache refresh cache from disk.
// If isLocked is true the caller already holds d.m; otherwise refreshCache acquires it.
func (d *DeviceManager) refreshCache(isLocked bool) error {
Expand Down
95 changes: 44 additions & 51 deletions pkg/server/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"github.com/lf-edge/adam/pkg/driver/common"
ax "github.com/lf-edge/adam/pkg/x509"
"github.com/lf-edge/eve-api/go/config"
"github.com/lf-edge/eve-api/go/info"
"github.com/lf-edge/eve-api/go/metrics"
uuid "github.com/satori/go.uuid"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
Expand All @@ -36,6 +34,7 @@ type adminHandler struct {
infoStream *stream
requestsStream *stream
metricsStream *stream
flowlogsStream *stream
}

// OnboardCert encoding for sending an onboard cert and serials via json
Expand Down Expand Up @@ -421,55 +420,63 @@ func (h *adminHandler) deviceConfigSet(w http.ResponseWriter, r *http.Request) {
}

func (h *adminHandler) deviceLogsGet(w http.ResponseWriter, r *http.Request) {
h.deviceDataGet(w, r, h.logStream, h.manager.GetLogsReader, nil)
readerFunc := func(id instanceID) (common.ChunkReader, error) {
return h.manager.GetLogsReader(id.devUUID)
}
h.deviceDataGet(w, r, h.logStream, readerFunc)
}

func (h *adminHandler) deviceInfoGet(w http.ResponseWriter, r *http.Request) {
h.deviceDataGet(w, r, h.infoStream, h.manager.GetInfoReader, func(in []byte) ([]byte, error) {
var err error
msg := &info.ZInfoMsg{}
if err = proto.Unmarshal(in, msg); err != nil {
return nil, fmt.Errorf("error parsing info message: %v", err)
}
var entryBytes []byte
if entryBytes, err = protojson.Marshal(msg); err != nil {
return nil, fmt.Errorf("failed to marshal info message: %v", err)
}
return entryBytes, nil
})
readerFunc := func(id instanceID) (common.ChunkReader, error) {
return h.manager.GetInfoReader(id.devUUID)
}
h.deviceDataGet(w, r, h.infoStream, readerFunc)
}

func (h *adminHandler) deviceRequestsGet(w http.ResponseWriter, r *http.Request) {
h.deviceDataGet(w, r, h.requestsStream, h.manager.GetRequestsReader, nil)
readerFunc := func(id instanceID) (common.ChunkReader, error) {
return h.manager.GetRequestsReader(id.devUUID)
}
h.deviceDataGet(w, r, h.requestsStream, readerFunc)
}

func (h *adminHandler) deviceMetricsGet(w http.ResponseWriter, r *http.Request) {
h.deviceDataGet(w, r, h.metricsStream, h.manager.GetMetricsReader, func(in []byte) ([]byte, error) {
var err error
msg := &metrics.ZMetricMsg{}
if err = proto.Unmarshal(in, msg); err != nil {
return nil, fmt.Errorf("error parsing metrics message: %v", err)
}
var entryBytes []byte
if entryBytes, err = protojson.Marshal(msg); err != nil {
return nil, fmt.Errorf("failed to marshal metrics message: %v", err)
}
return entryBytes, nil
})
readerFunc := func(id instanceID) (common.ChunkReader, error) {
return h.manager.GetMetricsReader(id.devUUID)
}
h.deviceDataGet(w, r, h.metricsStream, readerFunc)
}

func (h *adminHandler) appLogsGet(w http.ResponseWriter, r *http.Request) {
readerFunc := func(id instanceID) (common.ChunkReader, error) {
return h.manager.GetAppLogsReader(id.devUUID, id.appUUID)
}
h.deviceDataGet(w, r, h.logStream, readerFunc)
}

func (h *adminHandler) deviceFlowlogsGet(w http.ResponseWriter, r *http.Request) {
readerFunc := func(id instanceID) (common.ChunkReader, error) {
return h.manager.GetFlowMessageReader(id.devUUID)
}
h.deviceDataGet(w, r, h.flowlogsStream, readerFunc)
}

func (h *adminHandler) deviceDataGet(w http.ResponseWriter, r *http.Request,
s *stream, readerFunc func(u uuid.UUID) (common.ChunkReader, error),
conversionFunc func(in []byte) ([]byte, error)) {
u := mux.Vars(r)["uuid"]
uid, err := uuid.FromString(u)
s *stream, readerFunc func(instanceID) (common.ChunkReader, error)) {
var id instanceID
var err error
uuidStr := mux.Vars(r)["uuid"]
id.devUUID, err = uuid.FromString(uuidStr)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
conversionRequired := false
if conversionFunc != nil {
conversionRequired = acceptJSON(r)
if uuidStr, hasAppUUID := mux.Vars(r)["appuuid"]; hasAppUUID {
id.appUUID, err = uuid.FromString(uuidStr)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
watch := r.Header.Get(StreamHeader)
if watch == StreamValue {
Expand All @@ -490,19 +497,12 @@ func (h *adminHandler) deviceDataGet(w http.ResponseWriter, r *http.Request,
w.Header().Set("Content-type", "application/json")
flusher.Flush()

c, unsubscribe := s.subscribe(uid)
c, unsubscribe := s.subscribe(id)
defer unsubscribe()

for {
select {
case b := <-c:
if conversionRequired {
b, err = conversionFunc(b)
if err != nil {
log.Printf("conversionFunc failed: %v", err)
continue
}
}
w.Write(append(b, 0x0a))
flusher.Flush()
case <-cn.CloseNotify():
Expand All @@ -512,7 +512,7 @@ func (h *adminHandler) deviceDataGet(w http.ResponseWriter, r *http.Request,
}
} else {
for {
chunk, err := readerFunc(uid)
chunk, err := readerFunc(id)
_, isNotFound := err.(*common.NotFoundError)
switch {
case err != nil && isNotFound:
Expand All @@ -538,13 +538,6 @@ func (h *adminHandler) deviceDataGet(w http.ResponseWriter, r *http.Request,
http.Error(w, fmt.Sprintf("error reading data: %v", err), http.StatusInternalServerError)
continue
}
if conversionRequired {
buf, err = conversionFunc(buf)
if err != nil {
log.Printf("conversionFunc failed: %v", err)
continue
}
}
w.Write(append(buf, 0x0a))
}
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/server/apiHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type apiHandler struct {
infoStream *stream
metricStream *stream
requestsStream *stream
flowlogsStream *stream
}

// GetUser godoc
Expand Down Expand Up @@ -51,7 +52,7 @@ func (h *apiHandler) recordClient(u *uuid.UUID, r *http.Request) {
log.Printf("error saving request structure: %v", err)
return
}
h.requestsStream.publish(*u, b)
h.requestsStream.publish(instanceID{devUUID: *u}, b)
h.manager.WriteRequest(*u, b)
}

Expand Down Expand Up @@ -246,7 +247,7 @@ func (h *apiHandler) appLogs(w http.ResponseWriter, r *http.Request) {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
status, err := appLogsProcess(h.manager, *u, uid, b)
status, err := appLogsProcess(h.manager, h.logStream, *u, uid, b)
if err != nil {
log.Printf("Failed to logsProcess: %v", err)
http.Error(w, http.StatusText(status), status)
Expand Down Expand Up @@ -281,7 +282,7 @@ func (h *apiHandler) newAppLogs(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
status, err := newAppLogsProcess(h.manager, *u, uid, r.Body)
status, err := newAppLogsProcess(h.manager, h.logStream, *u, uid, r.Body)
if err != nil {
log.Printf("Failed to logsProcess: %v", err)
http.Error(w, http.StatusText(status), status)
Expand Down Expand Up @@ -317,7 +318,7 @@ func (h *apiHandler) flowLog(w http.ResponseWriter, r *http.Request) {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
status, err := flowLogProcess(h.manager, *u, b)
status, err := flowLogProcess(h.manager, h.flowlogsStream, *u, b)
if err != nil {
log.Printf("Failed to logsProcess: %v", err)
http.Error(w, http.StatusText(status), status)
Expand Down
Loading
Loading