Skip to content

Commit 2291778

Browse files
author
mwallschlaeger
authored
Merge pull request #25 from bitflow-stream/anton-fix-code-smells
Fix some code smells
2 parents 740527b + 4a882d9 commit 2291778

21 files changed

+292
-258
lines changed

bitflow/endpoints.go

Lines changed: 108 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -296,52 +296,10 @@ func (f *EndpointFactory) CreateInput(inputs ...string) (SampleSource, error) {
296296
return nil, fmt.Errorf("Format cannot be specified for data input: %v", input)
297297
}
298298
if result == nil {
299-
reader := f.Reader(nil) // nil as Unmarshaller makes the SampleSource auto-detect the format
300-
if f.FlagSourceTag != "" {
301-
reader.Handler = sourceTagger(f.FlagSourceTag)
302-
}
303299
inputType = endpoint.Type
304-
switch endpoint.Type {
305-
case StdEndpoint:
306-
source := NewConsoleSource()
307-
source.Reader = reader
308-
result = source
309-
case TcpEndpoint, HttpEndpoint:
310-
source := &TCPSource{
311-
RemoteAddrs: []string{endpoint.Target},
312-
PrintErrors: !f.FlagTcpSourceDropErrors,
313-
RetryInterval: tcp_download_retry_interval,
314-
DialTimeout: tcp_dial_timeout,
315-
UseHTTP: endpoint.Type == HttpEndpoint,
316-
}
317-
source.TcpConnLimit = f.FlagTcpConnectionLimit
318-
source.Reader = reader
319-
result = source
320-
case TcpListenEndpoint:
321-
source := NewTcpListenerSource(endpoint.Target)
322-
source.SimultaneousConnections = f.FlagInputTcpAcceptLimit
323-
source.TcpConnLimit = f.FlagTcpConnectionLimit
324-
source.Reader = reader
325-
result = source
326-
case FileEndpoint:
327-
source := &FileSource{
328-
FileNames: []string{endpoint.Target},
329-
IoBuffer: f.FlagIoBuffer,
330-
Robust: f.FlagInputFilesRobust,
331-
KeepAlive: f.FlagFilesKeepAlive,
332-
}
333-
source.Reader = reader
334-
result = source
335-
default:
336-
if factory, ok := f.CustomDataSources[endpoint.Type]; ok && endpoint.IsCustomType {
337-
var factoryErr error
338-
result, factoryErr = factory(endpoint.Target)
339-
if factoryErr != nil {
340-
return nil, fmt.Errorf("Error creating '%v' input: %v", endpoint.Type, factoryErr)
341-
}
342-
} else {
343-
return nil, errors.New("Unknown input endpoint type: " + string(endpoint.Type))
344-
}
300+
result, err = f.createInput(endpoint)
301+
if err != nil {
302+
return nil, err
345303
}
346304
} else {
347305
if inputType != endpoint.Type {
@@ -369,6 +327,55 @@ func (f *EndpointFactory) CreateInput(inputs ...string) (SampleSource, error) {
369327
return result, nil
370328
}
371329

330+
func (f *EndpointFactory) createInput(endpoint EndpointDescription) (SampleSource, error) {
331+
reader := f.Reader(nil) // nil as Unmarshaller makes the SampleSource auto-detect the format
332+
if f.FlagSourceTag != "" {
333+
reader.Handler = sourceTagger(f.FlagSourceTag)
334+
}
335+
switch endpoint.Type {
336+
case StdEndpoint:
337+
source := NewConsoleSource()
338+
source.Reader = reader
339+
return source, nil
340+
case TcpEndpoint, HttpEndpoint:
341+
source := &TCPSource{
342+
RemoteAddrs: []string{endpoint.Target},
343+
PrintErrors: !f.FlagTcpSourceDropErrors,
344+
RetryInterval: tcp_download_retry_interval,
345+
DialTimeout: tcp_dial_timeout,
346+
UseHTTP: endpoint.Type == HttpEndpoint,
347+
}
348+
source.TcpConnLimit = f.FlagTcpConnectionLimit
349+
source.Reader = reader
350+
return source, nil
351+
case TcpListenEndpoint:
352+
source := NewTcpListenerSource(endpoint.Target)
353+
source.SimultaneousConnections = f.FlagInputTcpAcceptLimit
354+
source.TcpConnLimit = f.FlagTcpConnectionLimit
355+
source.Reader = reader
356+
return source, nil
357+
case FileEndpoint:
358+
source := &FileSource{
359+
FileNames: []string{endpoint.Target},
360+
IoBuffer: f.FlagIoBuffer,
361+
Robust: f.FlagInputFilesRobust,
362+
KeepAlive: f.FlagFilesKeepAlive,
363+
}
364+
source.Reader = reader
365+
return source, nil
366+
default:
367+
if factory, ok := f.CustomDataSources[endpoint.Type]; ok && endpoint.IsCustomType {
368+
result, err := factory(endpoint.Target)
369+
if err != nil {
370+
err = fmt.Errorf("Error creating '%v' input: %v", endpoint.Type, err)
371+
}
372+
return result, err
373+
} else {
374+
return nil, errors.New("Unknown input endpoint type: " + string(endpoint.Type))
375+
}
376+
}
377+
}
378+
372379
// Writer returns an instance of SampleWriter, configured by the values stored in the EndpointFactory.
373380
func (f *EndpointFactory) Writer() SampleWriter {
374381
return SampleWriter{f.FlagParallelHandler}
@@ -377,7 +384,6 @@ func (f *EndpointFactory) Writer() SampleWriter {
377384
// CreateInput creates a SampleSink object based on the given output endpoint description
378385
// and the configuration flags in the EndpointFactory.
379386
func (f *EndpointFactory) CreateOutput(output string) (SampleProcessor, error) {
380-
var resultSink SampleProcessor
381387
endpoint, err := f.ParseEndpointDescription(output, true)
382388
if err != nil {
383389
return nil, err
@@ -389,18 +395,28 @@ func (f *EndpointFactory) CreateOutput(output string) (SampleProcessor, error) {
389395
return nil, err
390396
}
391397
}
392-
var marshallingSink *AbstractMarshallingSampleOutput
398+
resultSink, marshallingSink, err := f.createOutput(endpoint, &marshaller)
399+
if err != nil {
400+
return nil, err
401+
}
402+
if marshallingSink != nil {
403+
marshallingSink.SetMarshaller(marshaller)
404+
marshallingSink.Writer = f.Writer()
405+
}
406+
return resultSink, nil
407+
}
408+
409+
func (f *EndpointFactory) createOutput(endpoint EndpointDescription, marshaller *Marshaller) (SampleProcessor, *AbstractMarshallingSampleOutput, error) {
393410
switch endpoint.Type {
394411
case StdEndpoint:
395412
sink := NewConsoleSink()
396-
marshallingSink = &sink.AbstractMarshallingSampleOutput
397-
if txt, ok := marshaller.(TextMarshaller); ok {
398-
txt.AssumeStdout = true
399-
marshaller = txt
400-
} else if txt, ok := marshaller.(*TextMarshaller); ok {
413+
if txt, ok := (*marshaller).(TextMarshaller); ok {
401414
txt.AssumeStdout = true
415+
*marshaller = txt
416+
} else if txt2, ok2 := (*marshaller).(*TextMarshaller); ok2 {
417+
txt2.AssumeStdout = true
402418
}
403-
resultSink = sink
419+
return sink, &sink.AbstractMarshallingSampleOutput, nil
404420
case FileEndpoint:
405421
sink := &FileSink{
406422
Filename: endpoint.Target,
@@ -409,8 +425,7 @@ func (f *EndpointFactory) CreateOutput(output string) (SampleProcessor, error) {
409425
Append: f.FlagFilesAppend,
410426
VanishedFileCheck: f.FlagFileVanishedCheck,
411427
}
412-
marshallingSink = &sink.AbstractMarshallingSampleOutput
413-
resultSink = sink
428+
return sink, &sink.AbstractMarshallingSampleOutput, nil
414429
case TcpEndpoint:
415430
sink := &TCPSink{
416431
Endpoint: endpoint.Target,
@@ -420,8 +435,7 @@ func (f *EndpointFactory) CreateOutput(output string) (SampleProcessor, error) {
420435
if f.FlagTcpLogReceivedData {
421436
sink.LogReceivedTraffic = log.ErrorLevel
422437
}
423-
marshallingSink = &sink.AbstractMarshallingSampleOutput
424-
resultSink = sink
438+
return sink, &sink.AbstractMarshallingSampleOutput, nil
425439
case TcpListenEndpoint:
426440
sink := &TCPListenerSink{
427441
Endpoint: endpoint.Target,
@@ -431,12 +445,11 @@ func (f *EndpointFactory) CreateOutput(output string) (SampleProcessor, error) {
431445
if f.FlagTcpLogReceivedData {
432446
sink.LogReceivedTraffic = log.ErrorLevel
433447
}
434-
marshallingSink = &sink.AbstractMarshallingSampleOutput
435-
resultSink = sink
448+
return sink, &sink.AbstractMarshallingSampleOutput, nil
436449
case HttpEndpoint:
437450
theUrl, err := url.Parse("http://" + endpoint.Target)
438451
if err != nil {
439-
return nil, err
452+
return nil, nil, err
440453
}
441454
sink := &HttpServerSink{
442455
Endpoint: theUrl.Host,
@@ -448,24 +461,18 @@ func (f *EndpointFactory) CreateOutput(output string) (SampleProcessor, error) {
448461
if f.FlagTcpLogReceivedData {
449462
sink.LogReceivedTraffic = log.ErrorLevel
450463
}
451-
marshallingSink = &sink.AbstractMarshallingSampleOutput
452-
resultSink = sink
464+
return sink, &sink.AbstractMarshallingSampleOutput, nil
453465
default:
454466
if factory, ok := f.CustomDataSinks[endpoint.Type]; ok && endpoint.IsCustomType {
455-
var factoryErr error
456-
resultSink, factoryErr = factory(endpoint.Target)
457-
if factoryErr != nil {
458-
return nil, fmt.Errorf("Error creating '%v' output: %v", endpoint.Type, factoryErr)
467+
sink, err := factory(endpoint.Target)
468+
if err != nil {
469+
err = fmt.Errorf("Error creating '%v' output: %v", endpoint.Type, err)
459470
}
471+
return sink, nil, err
460472
} else {
461-
return nil, errors.New("Unknown output endpoint type: " + string(endpoint.Type))
473+
return nil, nil, errors.New("Unknown output endpoint type: " + string(endpoint.Type))
462474
}
463475
}
464-
if marshallingSink != nil {
465-
marshallingSink.SetMarshaller(marshaller)
466-
marshallingSink.Writer = f.Writer()
467-
}
468-
return resultSink, nil
469476
}
470477

471478
func (f *EndpointFactory) CreateMarshaller(format MarshallingFormat) (Marshaller, error) {
@@ -562,31 +569,8 @@ func (f *EndpointFactory) ParseUrlEndpointDescription(endpoint string) (res Endp
562569
target := urlParts[1]
563570
res.Target = target
564571
for _, part := range strings.Split(urlParts[0], "+") {
565-
// TODO unclean: this parsing method is used for both marshalling/unmarshalling endpoints
566-
if f.isMarshallingFormat(part) {
567-
if res.Format != "" {
568-
err = fmt.Errorf("Multiple formats defined in: %v", endpoint)
569-
return
570-
}
571-
res.Format = MarshallingFormat(part)
572-
} else {
573-
if res.Type != UndefinedEndpoint {
574-
err = fmt.Errorf("Multiple transport types defined: %v", endpoint)
575-
return
576-
}
577-
switch EndpointType(part) {
578-
case TcpEndpoint, TcpListenEndpoint, FileEndpoint, HttpEndpoint:
579-
res.Type = EndpointType(part)
580-
case StdEndpoint:
581-
if target != stdTransportTarget {
582-
err = fmt.Errorf("Transport '%v' can only be defined with target '%v'", part, stdTransportTarget)
583-
return
584-
}
585-
res.Type = StdEndpoint
586-
default:
587-
res.IsCustomType = true
588-
res.Type = EndpointType(part)
589-
}
572+
if err = f.parseEndpointUrlPart(part, &res, endpoint, target); err != nil {
573+
return
590574
}
591575
}
592576
if res.Type == UndefinedEndpoint {
@@ -602,6 +586,33 @@ func (f *EndpointFactory) ParseUrlEndpointDescription(endpoint string) (res Endp
602586
return
603587
}
604588

589+
func (f *EndpointFactory) parseEndpointUrlPart(part string, res *EndpointDescription, endpoint, target string) error {
590+
// TODO unclean: this parsing method is used for both marshalling/unmarshalling endpoints
591+
if f.isMarshallingFormat(part) {
592+
if res.Format != "" {
593+
return fmt.Errorf("Multiple formats defined in: %v", endpoint)
594+
}
595+
res.Format = MarshallingFormat(part)
596+
} else {
597+
if res.Type != UndefinedEndpoint {
598+
return fmt.Errorf("Multiple transport types defined: %v", endpoint)
599+
}
600+
switch EndpointType(part) {
601+
case TcpEndpoint, TcpListenEndpoint, FileEndpoint, HttpEndpoint:
602+
res.Type = EndpointType(part)
603+
case StdEndpoint:
604+
if target != stdTransportTarget {
605+
return fmt.Errorf("Transport '%v' can only be defined with target '%v'", part, stdTransportTarget)
606+
}
607+
res.Type = StdEndpoint
608+
default:
609+
res.IsCustomType = true
610+
res.Type = EndpointType(part)
611+
}
612+
}
613+
return nil
614+
}
615+
605616
func (f *EndpointFactory) isMarshallingFormat(formatName string) bool {
606617
_, ok := f.Marshallers[MarshallingFormat(formatName)]
607618
return ok

bitflow/fork/fork_distributors.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,16 @@ func (d *PipelineCache) getPipelines(key string, build PipelineBuildFunc) ([]Sub
130130
}
131131
d.pipelines[key] = pipes
132132
}
133+
d.updateSortedPipelineKeys(key, pipes)
134+
result := make([]Subpipeline, len(pipes))
135+
for i, pipe := range pipes {
136+
result[i].Key = key
137+
result[i].Pipe = pipe
138+
}
139+
return result, nil
140+
}
141+
142+
func (d *PipelineCache) updateSortedPipelineKeys(key string, pipes []*bitflow.SamplePipeline) {
133143
// Maintain a sorted list of keys that lead to each pipeline
134144
for _, pipe := range pipes {
135145
if keys, ok := d.keys[pipe]; ok {
@@ -146,12 +156,6 @@ func (d *PipelineCache) getPipelines(key string, build PipelineBuildFunc) ([]Sub
146156
d.keys[pipe] = []string{key}
147157
}
148158
}
149-
result := make([]Subpipeline, len(pipes))
150-
for i, pipe := range pipes {
151-
result[i].Key = key
152-
result[i].Pipe = pipe
153-
}
154-
return result, nil
155159
}
156160

157161
func (d *PipelineCache) ContainedStringers() []fmt.Stringer {

bitflow/marshall_text.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ const (
2929
// in a key = value format. The width of the header line, the number of columns
3030
// in the table, and the spacing between the columns in the table can be configured.
3131
type TextMarshaller struct {
32-
3332
// TextWidths sets the width of the header line and value table.
3433
// If Columns > 0, this value is ignored as the width is determined by the
3534
// number of columns. If this is 0, the width will be determined automatically:
@@ -174,23 +173,29 @@ func (m TextMarshaller) writeHeader(header string, textWidth int, writer io.Writ
174173
}
175174

176175
func (m TextMarshaller) writeLines(lines []string, widths []int, writer io.Writer) error {
177-
columns := len(widths)
178176
for i, line := range lines {
179-
if _, err := writer.Write([]byte(line)); err != nil {
177+
if err := m.writeLine(line, i, lines, widths, writer); err != nil {
180178
return err
181179
}
182-
col := i % columns
183-
if col >= columns-1 || i == len(lines)-1 {
184-
if _, err := writer.Write([]byte("\n")); err != nil {
180+
}
181+
return nil
182+
}
183+
184+
func (m TextMarshaller) writeLine(line string, lineIndex int, lines []string, widths []int, writer io.Writer) error {
185+
if _, err := writer.Write([]byte(line)); err != nil {
186+
return err
187+
}
188+
col := lineIndex % len(widths)
189+
if col >= len(widths)-1 || lineIndex == len(lines)-1 {
190+
if _, err := writer.Write([]byte("\n")); err != nil {
191+
return err
192+
}
193+
} else {
194+
extraSpace := widths[col] - len(line)
195+
for j := 0; j < extraSpace; j++ {
196+
if _, err := writer.Write([]byte(" ")); err != nil {
185197
return err
186198
}
187-
} else {
188-
extraSpace := widths[col] - len(line)
189-
for j := 0; j < extraSpace; j++ {
190-
if _, err := writer.Write([]byte(" ")); err != nil {
191-
return err
192-
}
193-
}
194199
}
195200
}
196201
return nil

bitflow/pipeline.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (p *SamplePipeline) Construct(tasks *golib.TaskGroup) {
6464
tasks.Add(&ProcessorTaskWrapper{proc})
6565
}
6666
}
67-
tasks.Add(&SourceTaskWrapper{firstSource})
67+
tasks.Add(&SourceTaskWrapper{firstSource})
6868
}
6969

7070
// Add adds the SampleProcessor parameter to the list of SampleProcessors in the
@@ -74,11 +74,10 @@ func (p *SamplePipeline) Construct(tasks *golib.TaskGroup) {
7474
// pipeline.Add(processor1).Add(processor2)
7575
func (p *SamplePipeline) Add(processor SampleProcessor) *SamplePipeline {
7676
if p.lastProcessor != nil {
77-
if merger, ok := p.lastProcessor.(MergeableProcessor); ok {
78-
if merger.MergeProcessor(processor) {
79-
// Merge successful: drop the incoming step
80-
return p
81-
}
77+
merger, ok := p.lastProcessor.(MergeableProcessor)
78+
if ok && merger.MergeProcessor(processor) {
79+
// Merge successful: drop the incoming step
80+
return p
8281
}
8382
}
8483
p.lastProcessor = processor

0 commit comments

Comments
 (0)