Skip to content

Commit

Permalink
Merge pull request #31 from vladimirvivien/big-refactor
Browse files Browse the repository at this point in the history
Big Refactor, Big Redesign

These commits completely redesign Automi. Please see README.md and docs/automi.md for detail.
  • Loading branch information
vladimirvivien authored Jul 31, 2019
2 parents 43ecd29 + ad36cd6 commit 464abb8
Show file tree
Hide file tree
Showing 136 changed files with 15,702 additions and 2,348 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ bin
src
ex0
result.txt
vendor
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: go
go:
- 1.7.4
- "1.12.x"
notifications:
email:
on_success: change
Expand Down
350 changes: 271 additions & 79 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,103 +1,295 @@
Automi
======
Composable Stream Processing over Go Channels!
<h1 align="center">
<img src="./docs/automi_logo.png" alt="automi">
</h1>

<h4 align="center">A data stream processing API for Go (alpha)</h4>
<br/>

[![GoDoc](https://godoc.org/github.com/vladimirvivien/automi?status.svg)](https://godoc.org/github.com/vladimirvivien/automi)
[![Build Status](https://travis-ci.org/vladimirvivien/automi.svg)](https://travis-ci.org/vladimirvivien/automi)

Automi abstracts away (not too far away) the gnarly details of using Go channels to create pipelined and staged processes. It exposes higher-level API to compose and integrate stream of data over Go channels for processing. This is `still alpha work`. The API is still evolving and changing rapidly with each commit (beware). Nevertheless, the core concepts are have been bolted onto the API. The following example shows how Automi could be used to compose a multi-stage pipeline to process stream of data from a csv file.
Automi is an API for processing streams of data using idiomatic Go. Using Automi, programs can process streaming of data chunks by composing stages of operations that are applied to each element of the stream.

## Concept

<h1 align="center">
<img src="./docs/streaming.png" alt="Automi streaming concepts">
</h1>

<br/>

The Automi API expresses a stream with four primitives including:

- *An emitter*: an in-memory, network, or file resource that can emit elements for streaming
- *The stream*: represents a conduit whithin which data elements are streamed
- *Stream operations*: code which can be attached to the stream to process streamed elements
- *A collector*: an in-memory, network, or file resource that can collect streamed data.

Automi streams use Go channels internally to route data. This means Automi streams automatically support features such as buffering, automatic back-pressure queuing, and concurrency safety.

## Using Automi

Now, let us explore some examples to see how easy it is to use Automi to stream and process data.

>See all examples in the [./example](./examples) directory.
### Example: streaming from a slice into stdout

This first example shows how easy it is to compose and express stream operations with Automi. In this example, rune values are emitted from a slice and are streamed invidividually. Stream operator method `Filter` is applied to filter out unwanted rune values and the `Sort` operator method sorts the remaining items. Lastly, a `collector` is used to collect the result into an io.Writer and piped to `stdout`.

```go
func main() {
strm := stream.New([]rune("B世!ぽ@opqDQRS#$%^&*()ᅖ...O6PTnVWXѬYZbcef7ghijCklrAstvw"))

strm.Filter(func(item rune) bool {
return item >= 65 && item < (65+26)
}).Map(func(item rune) string {
return string(item)
}).Batch().Sort()
strm.Into(collectors.Writer(os.Stdout))

if err := <-strm.Open(); err != nil {
fmt.Println(err)
return
}
}
```

> See the full [source code](./examples/emitters/slice0).
##### How it works

1. Create the stream with an emitter source. Automi supports several types of sources including channels, io.Reader, slices, etc. (see list of emitters below). Each element in the slice will be streamed individually.

```go
strm := stream.New([]rune(`B世!ぽ@opqDQRS#$%^&*()ᅖ...O6PTnVWXѬYZbcef7ghijCklrAstvw`))
```


2. Apply user-provided or built-in stream operations as shown below:

```go
strm.Filter(func(item rune) bool {
return item >= 65 && item < (65+26)
}).Map(func(item rune) string {
return string(item)
}).Batch().Sort()
```

3. Collect the result. In this example, the result is collected into an `io.Writer` which further streams the data into standard output:

```go
strm.Into(collectors.Writer(os.Stdout))
```

4. Lastly, open the stream once it is properly composed:

```go
if err := <-strm.Open(); err != nil {
fmt.Println(err)
return
}
```

### Example: streaming from an `io.Reader` into collector function

The next example shows how to use Automi to stream data from an `io.Reader` emitting buffered string values from an in-memory source in 50-byte chunks. The data is processed with a `Map` and `Filter` opertor methods and the result is sent to a user-provided collector function which prints the result.

```go
func main() {
data := `"request", "/i/a", "00:11:51:AA", "accepted"
"response", "/i/a/", "00:11:51:AA", "served"
"response", "/i/a", "00:BB:22:DD", "served"...`

reader := strings.NewReader(data)

// create stream from a buffered io.Reader emitter,
// emitting 50-byte chunks.
stream := stream.New(emitters.Reader(reader).BufferSize(50))
stream.Map(func(chunk []byte) string {
str := string(chunk)
return str
})
stream.Filter(func(e string) bool {
return (strings.Contains(e, `"response"`))
})
stream.Into(collectors.Func(func(data interface{}) error {
e := data.(string)
fmt.Println(e)
return nil
}))

if err := <-stream.Open(); err != nil {
fmt.Println(err)
return
}
}
```

> See complete example [here](./examples/emitters/reader/emitreader.go).
### Example
The following abbreviated code snippet shows how the Automi API could be used to apply multiple operators to data items as they are streamed. [See full example here.](example https://github.com/vladimirvivien/automi/blob/master/examples/ex0/procdata.go)
### Example: streaming using CSV files
The following example streams data from a CSV source file. Each row is mapped to a custom type, filtered, then mapped to a slice of strings which is then collected into another CSV file.

```Go
```go
type scientist struct {
FirstName string
LastName string
Title string
BornYear int
}
// Example of stream processing with multi operators applied
// src - CsvSource to load data file, emits each record as []string
// out - CsvSink to write result file, expects each entry as []string

func main() {
in := src.New().WithFile("./data.txt")
out := snk.New().WithFile("./result.txt")

stream := stream.New().From(in)
stream.Map(func(cs []string) scientist {
yr, _ := strconv.Atoi(cs[3])
return scientist{
FirstName: cs[1],
LastName: cs[0],
Title: cs[2],
BornYear: yr,
}
})
stream.Filter(func(cs scientist) bool {
if cs.BornYear > 1930 {
return true
// creates a stream using a CSV emitter
// emits each row as []string
stream := stream.New("./data.txt")

// Map each CSV row, []string, to type scientist
stream.Map(func(cs []string) scientist {
yr, _ := strconv.Atoi(cs[3])
return scientist{
FirstName: cs[1],
LastName: cs[0],
Title: cs[2],
BornYear: yr,
}
})
stream.Filter(func(cs scientist) bool {
return (cs.BornYear > 1930)
})
stream.Map(func(cs scientist) []string {
return []string{cs.FirstName, cs.LastName, cs.Title}
})
stream.Into("./result.txt")

// open the stream
if err := <-stream.Open(); err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println("wrote result to file result.txt")
}
```

> See complete example [here](./examples/customtype/process.go).
### Example: streaming HTTP requests and responses
The following example shows how to use Automi to stream and process data using HTTP requests and responses. The following HTTP server program streams data from the request Body, encodes it using base64, and streams the result into the HTTP response:

```go
func main() {

http.HandleFunc(
"/",
func(resp http.ResponseWriter, req *http.Request) {
resp.Header().Add("Content-Type", "text/html")
resp.WriteHeader(http.StatusOK)

strm := stream.New(req.Body)
strm.Process(func(data []byte) string {
return base64.StdEncoding.EncodeToString(data)
}).Into(resp)

if err := <-strm.Open(); err != nil {
resp.WriteHeader(http.StatusInternalServerError)
log.Printf("Stream error: %s", err)
}
},
)

log.Println("Server listening on :4040")
http.ListenAndServe(":4040", nil)
}
```
> See complete example [here](./examples/net/http/httpsvr.go).
### Streaming gRPC service payload
The following example shows how to use Automi to stream data items from a gRPC streaming sevice. The following gRPC
client setups an Automi emitter to emit time values that are streamed from a gRPC time service:

```go
// setup an Automi emitter function to stream from the gRPC service
func emitStreamFrom(client pb.TimeServiceClient) <-chan []byte {
source := make(chan []byte)
timeStream, err := client.GetTimeStream(context.Background(), &pb.TimeRequest{Interval: 3000})
...
go func(stream pb.TimeService_GetTimeStreamClient, srcCh chan []byte) {
defer close(srcCh)
for {
t, err := stream.Recv()
srcCh <- t.Value
}
return false
})
stream.Map(func(cs scientist) []string {
return []string{cs.FirstName, cs.LastName, cs.Title}
}(timeStream, source)

return source
}

func main() {
...
client := pb.NewTimeServiceClient(conn)
// create automi stream
stream := stream.New(emitStreamFrom(client))
stream.Map(func(item []byte) time.Time {
secs := int64(binary.BigEndian.Uint64(item))
return time.Unix(int64(secs), 0)
})
stream.To(out)
stream.Into(collectors.Func(func(item interface{}) error {
time := item.(time.Time)
fmt.Println(time)
return nil
}))

// open the stream
if err := <-stream.Open(); err != nil {
fmt.Println(err)
os.Exit(1)
}

<-stream.Open() // wait for completion
}
```
The previous code sample creates a new stream to process data ingested from a csv file using several. In the code, each method call on the stream (`From()`, `Filter()`, `Map()`,, and `To()`) represents a stage in the pipeline as illustrated in the following:

From(source) -> Map(item) -> Filter(item) -> Map(item) -> To(sink)
> See complete example [here](./examples/grpc).
The stream operators that are applied to the stream as follows:
## More Examples
[Examples](./examples) - View a long list of examples that cover all aspects of using Automi.

- `stream.From()` - reads stream from CsvSource
- `stream.Map()` - maps []string to to scientist type
- `stream.Filter()` - filters out scientist.BornYear > 1938
- `stream.Map()` - maps scientist value to []string
- `stream.To()` - writes stream values to a CsvSink
- `stream.Open()` - opens and executes stream operator and wait for completion.
## Automi components
Automi comes with a set of built-in components to get you started with stream processing including the followings.

The code implements stream processing based on the pipeline patterns. What is clearly absent, however, is the low level channel communication code to coordinate and synchronize goroutines. The programmer is provided a clean surface to express business code without the noisy channel infrastructure code. Underneath the cover however, Automi is using patterns similar to the pipeline patterns to create safe and concurrent structures to execute the processing of the data stream.
### Emitters

# Roadmap
The API is still taking shape into something that, hopefully will be enjoyable and practical code to create stream processors. The project is a moving target right now, however the code will focus stabilizing the core API, additional operators, and sources/sinks connectors. In the near future, there's plan to add functionalities to support execution windows to control stream growth and pressure on reductive operators.
* `Channel`
* `CSV`
* `io.Reader`
* `io.Scanner`
* `Slice`

### Operators
The focus of the project will be to continue to implement pre-built operators
to help stream processor creators.
- **Transformation** Filter, Maps, Join
- **Accumulation** Reduce, Aggregation, Grouping
- **Etc**

### Features
Automi will strive to implement more features to make it more useful as the
project matures including some of the followings.
- **Continuous streams** Better control of non-ending streams
- **Concurrency** Control of concurrently running operators
- **Timout and Cancellation Policies** Establish constraints for running
processes
- **Metrics** Expose metrics of running processes

### Core Sources/Sinks
The followings are potential sources and sink components that will be part of
the core API.
- **io.Reader** Stream from io.Reader sources
- **io.Writer** Write stream data to io.Writer sinks
- **Csv**: Stream from/to value-separated files
- **Socket** Stream from/to network sockets
- **Http**: Source from http end-point, stream from/to http sinks
- **Database**: Source from DB tables, stream to DB sinks

### External Source/Sink Ideas
The following is a list of sources and sinks ideas that should be implemented as
externals projects to avoid unwanted dependencies.
- **Messaging** Kafka, NATs, Etc
- **Distributed FS** HDFS, S3, etc
- **Distributed DB** Cassandra, Mongo, etc
- **Logging** Flume, statsd, syslog, etc
- Whatever source/sink users find useful
- Etc

* `Stream.Filter`
* `Stream.Map`
* `Stream.FlatMap`
* `Stream.Reduce`
* `Stream.GroupByKey`
* `Stream.GroupByName`
* `Stream.GroupByPos`
* `Stream.Sort`
* `Stream.SortByKey`
* `Stream.SortByName`
* `Stream.SortByPos`
* `Stream.SortWith`
* `Stream.Sum`
* `Stream.SumByKey`
* `Stream.SumByName`
* `Stream.SumByPos`
* `Stream.SumAllKeys`

### Collectors

* `CSV`
* `Func`
* `Null`
* `Slice`
* `Writer`

## Licence
Apache 2.0
Loading

0 comments on commit 464abb8

Please sign in to comment.