hackajob Insider

What The Tech?!: NATS and all things Golang

Written by hackajob Staff | Sep 22, 2021 11:00:00 PM

If you're a Go programmer, you'll know how essential knowing how to implement NATS in Go is. NATS is an open-source messaging system, with its server is written in the Go programming language. If you want to know more about Go and if you should learn it, have no fear – we've already covered that in our post here.

First, let's talk about what NATS is.

What is NATS

In the official documentation of NATS, NATS is described as:

NATS is a connective technology that powers modern distributed
systems. A connective technology is responsible for addressing and
discovery and exchanging of messages that drive the common patterns in
distributed systems; asking and answering questions, aka
services/microservices, and making and processing statements, or
stream processing.

NATS messaging enables the exchange of data that is segmented into
messages among computer applications and services. These messages are
addressed by subjects and do not depend on network location. This
provides an abstraction layer between the application or service and
the underlying physical network. Data is encoded and framed as a
message and sent by a publisher. The message is received, decoded, and
processed by one or more subscribers.

In addition to these definitions, we can add the following information:

  • Developed by Derek Collison in 2010 specifically for CloudFoundry.

  • The tool, which was previously developed with Ruby, has been written in Go since 2012.

  • It has been actively used in production environments for about 8 years.

  • NATS Server v1.0.0 was stabilized in 2017.

  • Incubation project by Cloud Native Computing Foundation on March 15, 2018, has been accepted as Currently by the open-source ecosystem called Synadia
    is being developed.

NATS Components

NATS basically consists of two components: Server & Client.

NATS Server: It's a server written in Go and forms the basis of the communication channel with its lightweight structure.

NATS Client: They're the units that connect to the NATS Server and they send and receive data. NATS has client libraries written for almost all
programming languages. Go, Node, Ruby, Java, C, C#, and NGINX C libraries are developed by the NATS team itself.

Okay, great, let's look more at the types of servers.There are 2 types of NATS Servers : NATS Server and NATS Streaming

We can explain the differences between them with the differences in message delivery methods so let's look at the multi quality of service:

Multiple Quality of Service (Multi QoS)

  • QoS refers to technologies used to reduce defects such as packet loss, latency, and jitter in the network.

  • Two delivery modes are supported to improve the quality of service over NATS.

At-most-once delivery: Message delivery in this method implemented by NATS Server, there is no coercion. In other words, if the client is not connected when the message is sent, it cannot receive the message.

At-least-once delivery: In this method applied by NATS Streaming, the message is kept on the server until the following conditions are met and it is tried to be transmitted to the clients:

  • A subscriber confirms that he has received the message

  • Message expires (timeout)

  • Memory gets tired

Finally, let's briefly talk about how Pub/Sub works in NATS and move on to the code part:

NATS basically works according to the Publisher/Subscriber method. In this method,
there are clients who send a message to the channel and also subscribe to this channel and wait for a message.

Additionally, it also supports Request/Reply and Queueing such as this:


pub/sub


queueing


**request/reply
**

Yes, finally we came to the code part. In this tutorial, we'll implement NATS Streaming Server, aka STAN. Let's begin. First of all, we need to install NATS in the environment we are working in.

In order not to extend the article too much, we'll not talk about the installation, you can install it by following:
https://docs.nats.io/nats-streaming-server/install

Now let's start our stan connection. We'll use stan's own library, which you can get from here: github.com/nats-io/stan.go

func ConnectStan(clientID string)  {
	clusterID := "test-cluster" // nats cluster id
	url := "nats://127.0.0.1:4222" // nats url

	// you can set client id anything
	sc, err := stan.Connect(clusterID, clientID, stan.NatsURL(url),
		stan.Pings(1, 3),
		stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
			log.Fatalf("Connection lost, reason: %v", reason)
		}))
	if err != nil {
		log.Fatalf("Can't connect: %v.\nMake sure a NATS Streaming Server is running at: %s", err, url)
	}

	log.Println("Connected Nats")

	Sc = sc
}

We used some features in the stan.Connect function. We set clusterID and clientID. Here, the stan.Pings function pings the server to check if it is up.
In the description it says exactly:

Pings is an Option to set the ping interval and max out values. The
interval needs to be at least 1 and represents the number of seconds.
The maxOut needs to be at least 2, since the count of sent PING
increases whenever a PING is sent and reset to 0 when a response is
received. Setting to 1 would cause the library to close the connection
right away.

We set 1 and 3 to comply with the rule written here. In addition, we set a SetConnectionLostHandler. This is actually a function that catches the connection when the connection is interrupted, as the name suggests. Using this handler you can set what you want to do when the connection is lost. We want it to throw fatal. So we wrote our connection function.

Now let's write our function that publishes the message we want to send to the channel:

func PublishNats(data []byte, channel string) {
	ach := func(s string, err2 error) {}
	_, err := Sc.PublishAsync(channel, data, ach)
	if err != nil {
		log.Fatalf("Error during async publish: %v\n", err)
	}
}

We want to send the message async so we use the PublishAsync function.
We give the message we want to send and the channel we want to send it to as a parameter. Here it also takes AckHandler as a parameter. AckHandler is used for Async Publishing to provide the status of the ack.

The function will be passed the GUID and any error state. No error means the message was successfully received by NATS Streaming.The Publish part is ok, now let's look at the side that will receive the message:

func PrintMessage(subject,qgroup,durable string ) {
	mcb := func(msg *stan.Msg) {
		if err := msg.Ack(); err != nil {
			log.Printf("failed to ACK msg:%v", err)
		}

		fmt.Println(string(msg.Data))
	}

	_, err := Sc.QueueSubscribe(subject,
		qgroup, mcb,
		stan.DeliverAllAvailable(),
		stan.SetManualAckMode(),
		stan.DurableName(durable))
	if err != nil {
		log.Println(err)
	}
}

We subscribe to the channel we created above and wait for messages from the channel and we print incoming messages to the screen.
Here we give the client a durable name with DurableName for subscribing to the channel with this name. In case of a possible client crash, stan keeps the data for us in the queue where we left off. When we subscribe again with the same durable name, it continues to give us our data from where we left off. That's why the durable name is so important.

Stan guarantees to deliver the data to the other party in this way. Additionally, you can specify to keep data in memory or disk in nats config file.

Now let's combine all the codes and see them collectively:

package main

import (
	"fmt"
	"github.com/nats-io/stan.go"
	"log"
)

var (
	Sc stan.Conn
)

func main() {
	quit := make(chan struct{})
	ConnectStan("nats-example")

	data := []byte("this is a test message")
	PublishNats(data, "test")

	PrintMessage("test","test","test-1")
	<-quit
}

func ConnectStan(clientID string)  {
	clusterID := "test-cluster" // nats cluster id
	url := "nats://127.0.0.1:4222" // nats url

	// you can set client id anything
	sc, err := stan.Connect(clusterID, clientID, stan.NatsURL(url),
		stan.Pings(1, 3),
		stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
			log.Fatalf("Connection lost, reason: %v", reason)
		}))
	if err != nil {
		log.Fatalf("Can't connect: %v.\nMake sure a NATS Streaming Server is running at: %s", err, url)
	}

	log.Println("Connected Nats")

	Sc = sc
}

func PublishNats(data []byte, channel string) {
	ach := func(s string, err2 error) {}
	_, err := Sc.PublishAsync(channel, data, ach)
	if err != nil {
		log.Fatalf("Error during async publish: %v\n", err)
	}
}

func PrintMessage(subject,qgroup,durable string ) {
	mcb := func(msg *stan.Msg) {
		if err := msg.Ack(); err != nil {
			log.Printf("failed to ACK msg:%v", err)
		}

		fmt.Println(string(msg.Data))
	}

	_, err := Sc.QueueSubscribe(subject,
		qgroup, mcb,
		stan.DeliverAllAvailable(),
		stan.SetManualAckMode(),
		stan.DurableName(durable))
	if err != nil {
		log.Println(err)
	}
}

Since this is a sample code, we did not split it into files. Our aim is to understand the working of the system. In ConnectStan, we assigned the connection object to a global stan.Conn object and used it in other functions. We sent our message with PublishNats and received the message we sent with the PrintMessage function and printed it on the screen.

If you want to test the NATS Streaming Server's ability to queue the message and wait for the message until the client connects again.
Comment out the PrintMessage function, run and close the program a few times.
Then uncomment the line and run it, in this run, you will see that PrintMessage prints all the messages that you sent in other
runs but the client could not receive.

Messages held in the queue are deleted to make room for new ones when the channel message limit is reached. You can edit this limit in NATS's config file or set a time for it to delete periodically if you want:


config file

References

https://nats.io/
https://docs.nats.io/nats-concepts/intro
https://github.com/nats-io/nats-streaming-server
https://github.com/nats-io/nats.go

Like what you've read or want more like this? Let us know! Email us here or DM us: Twitter, LinkedIn, Facebook, we'd love to hear from you.