Collector

Distribution lightly queue stream collect service.

Pre-requisite

  • Nats cluster needs to enable JetStream

  • MongoDB recommends version >= 5.0 so that time series collections can be used

  • Services and applications should work together the same nats tenant

Deploy

A collector service that subscribes to stream queues and then writes to data.

If you use the time series collection, you need to manually create a database and then add a data stream. Set the time series collection time field to timestamp and metadata field to metaField. Nats Stream naming COLLECT_${key} is consistent with database name ${key}.

The main container image is:

  • ghcr.io/weplanx/collector:latest

  • registry.cn-shenzhen.aliyuncs.com/weplanx/collector:latest

The case will use Kubernetes deployment orchestration, replicate deployment (modify as needed).

apiVersion: apps/v1
kind: Deployment
metadata:
  name: collector
spec:
  selector:
    matchLabels:
      app: collector
  template:
    metadata:
      labels:
        app: collector
    spec:
      containers:
        - image: registry.cn-shenzhen.aliyuncs.com/weplanx/collector:latest
          imagePullPolicy: Always
          name: collector
          env:
            - name: MODE
              value: release
            - name: NATS_HOSTS
              value: <*** your nats hosts ***>
            - name: NATS_NKEY
              value: <*** your nats nkey***>
            - name: DATABASE_URL
              value: <*** your mongodb url ***>
            - name: DATABASE_NAME
              value: <*** your mongodb name ***>

Environement

MODE

  • Working mode, default debug

NATS_HOSTS *required

  • Nats connection host, use , split

NATS_NKEY *required

  • Nats NKEY authentication

DATABASE_URL *required

  • MongoDB connection address

DATABASE_NAME *required

  • MongoDB database name

Client

The client for managing collector configuration, data transmission, and dispatching, installed in the application:

go get github.com/weplanx/collector

Initialize

// Create the nats client and then create the jetstream context
if js, err = nc.JetStream(nats.PublishAsyncMaxPending(256)); err != nil {
    panic(err)
}

// Create the transfer client
if x, err = client.New(js); err != nil {
    panic(err)
}

Set

err := x.Set(context.TODO(), client.StreamOption{
    Key:         "system",
    Description: "system example",
})

Update

err := x.Update(context.TODO(), client.StreamOption{
    Key:         "system",
    Description: "system example 123",
})

Get Info

result, err := client.Get("system")

Publish

err := x.Publish(context.TODO(), "system", client.Payload{
    Timestamp: time.Now(),
    Data: map[string]interface{}{
        "metadata": map[string]interface{}{
            "method":    method,
            "path":      string(c.Request.Path()),
            "user_id":   userId,
            "client_ip": c.ClientIP(),
        },
        "params":     string(c.Request.QueryString()),
        "body":       c.Request.Body(),
        "status":     c.Response.StatusCode(),
        "user_agent": string(c.Request.Header.UserAgent()),
    },
    XData: map[string]interface{}{},
})

Remove

err := x.Remove("system")

License

BSD-3-Clause License

Last updated