Realtime and Go

My Name's Paddy

  • @paddyforan on Twitter
  • @paddy on App.net
  • Developer Experience Engineer, Iron.io
  • Second Bit
  • Gopher
Me. Aren't I pretty?

#goonaboat

Please be nice.

What is Go?

"A better C, from the guys that didn’t bring you C++"

  • Compiled
  • Static typed
  • Fast
  • Elegant
  • Concurrent

Compiled

... sanely.

$ go build

Static typed

But without the stuttering.

x := 1
y := "Hello, world!"
z := map[string]int{"a": 1, "b": 2}

Fast

When compiling and executing.

Elegant

Old programs read like quiet conversations between a well-spoken research worker and a well-studied mechanical colleague, not as a debate with a compiler.

— Dick Gabriel

Concurrent

Don't communicate by sharing memory; share memory by communicating.

Enough Talk

It's time for websocket-y action!

growup.goonaboat.com

What do you want to be when you grow up?

Let's pull back the curtain

$ gofmt --comments=false *.go | wc -l
216

This example was shamelessly "inspired" by Gary Burd's example for his wonderful library.

3 processes

  • History
  • WebSocket
  • Server
They have one job.

History

Those that /dev/null history are unable to repeat it.

The Structure of History

type history struct {
        messages [][]byte
        writer chan []byte
        reader chan chan []byte
}
I heard you like channels...

Paying Attention

func (h *history) listen() {
        for {
                select {
                case msg := <-h.writer:
                        h.messages = append(h.messages, msg)
                case resp := <-h.reader:
                        for _, msg := range h.messages {
                                resp <- msg
                        }
                        close(resp)
                }
        }
}

Sharing What You Know

func (h *history) dump() [][]byte {
        resp := make(chan []byte)
        h.reader <- resp
        result := make([][]byte, 0)
        for msg := range resp {
                result = append(result, msg)
        }
        return result
}

WebSocket

This is the proverbial traffic cop of the system.

Defining a Connection

type connection struct {
        ws   *websocket.Conn
        send chan []byte
}

Reading

func (c *connection) readPump() {
        defer func() {
                wsHub.unregister <- c
                c.ws.Close()
        }()
        c.ws.SetReadLimit(maxMessageSize)
        c.ws.SetReadDeadline(time.Now().Add(readWait))
        ....
}

Reading (continued)

...
for {
        op, r, err := c.ws.NextReader()
        if err != nil {
                break
        }
...
}

Reading (last one)

...
switch op {
case websocket.OpPong:
        c.ws.SetReadDeadline(time.Now().Add(readWait))
case websocket.OpText:
        message, err := ioutil.ReadAll(r)
        if err != nil {
                break
        }
        wsHub.broadcast <- message
        msgHistory.writer <- message
}

Writing A Single Message

func (c *connection) write(opCode int, payload []byte) error {
        c.ws.SetWriteDeadline(time.Now().Add(writeWait))
        return c.ws.WriteMessage(opCode, payload)
}

Writing

func (c *connection) writePump() {
        ticker := time.NewTicker(pingPeriod)
        defer func() {
                ticker.Stop()
                c.ws.Close()
        }()
        for {
                ...
        }
}

Writing (continued)

select {
case message, ok := <-c.send:
    if !ok {
           c.write(websocket.OpClose, []byte{})
           return
    }
    if err := c.write(websocket.OpText, message); err != nil {
           return
    }
case <-ticker.C:
    if err := c.write(websocket.OpPing, []byte{}); err != nil {
           return
    }
}

Upgrading

func serveWs(w http.ResponseWriter, r *http.Request) {
    ws, err := websocket.Upgrade(w, r.Header, nil, 1024, 1024)
    if _, ok := err.(websocket.HandshakeError); ok {
            http.Error(w, "Not a websocket handshake", 400)
            return
    } else if err != nil {
            log.Println(err)
            return
    }
    c := &connection{send: make(chan []byte, 256), ws: ws}
    wsHub.register <- c
    go c.writePump()
    c.readPump()
}

The Hub

type hub struct {
        connections map[*connection]bool
        broadcast   chan []byte
        register    chan *connection
        unregister  chan *connection
}

var wsHub = hub{
        broadcast:   make(chan []byte),
        register:    make(chan *connection),
        unregister:  make(chan *connection),
        connections: make(map[*connection]bool),
}

The Hub (continued)

func (h *hub) run() {
        for {
                select {
                case c := <-h.register:
                        h.connections[c] = true
                case c := <-h.unregister:
                        delete(h.connections, c)
                        close(c.send)
                case m := <-h.broadcast:
                        for c := range h.connections {
                                select {
                                case c.send <- m:
                                default:
                                        close(c.send)
                                        delete(h.connections, c)

Server

A glorified Javascript delivery mechanism.

Flags & Globals

var addr = flag.String("addr", ":8080", "Address to listen on")
var file = flag.String("file", "growup.html", "Template to load")
var temp *template.Template
var msgHistory = &history{
        messages: make([][]byte, 0),
        writer: make(chan []byte),
        reader: make(chan chan []byte),
}

Template Data

type TemplateData struct {
        Host string
        History []string
}

Serving Requests

func serve(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "text/html; charset=utf-8")
    msgs := msgHistory.dump()
    tempMsgs := make([]string, len(msgs))
    for _, msg := range msgs {
            tempMsgs = append(tempMsgs, string(msg))
    }
    data := TemplateData{Host: r.Host, History: tempMsgs}
    err := temp.Execute(w, data)
    if err != nil {
            log.Println("temp.Execute: ", err)
            return
    }
}

Pulling it all together

func main() {
        flag.Parse()
        temp = template.Must(template.ParseFiles(*file))
        go wsHub.run()
        go msgHistory.listen()
        http.HandleFunc("/", serve)
        http.HandleFunc("/ws", serveWs)
        err := http.ListenAndServe(*addr, nil)
        if err != nil {
                log.Fatal("ListenAndServe: ", err)
        }
}

That's it. In its entirety.

github.com/paddyforan/goonaboat

So Let's Recap

Mainly for the people that spent the talk heckling me on Twitter.

Goroutines are Elegant

Cheap concurrency is never a bad thing.

Channels are Awesome

Because goroutines like to communicate.

WebSockets are Magic

I mean, really.

The Gopher is Adorable

Just look at him!

Go + Realtime = ♥

Like unicorns and magic, they belong together.

You're Going to Check Out Go

Thank You