cim

XGVela cim is the XGVela Telco PaaS Container Interface Module(CIM).

Cim is used as a sidecar for other service, such as cmaas, tmaas. 

The goal is to decouple applications from infrastructure, such as message queues and make application development less difficult.

Cim is a Go program.  

  1. Cim listen etcd's change and then handle real application to modify it's config

  2. Cim provide standard NAT interface for loging & event, cim forward it to KAFKA or other backends

Entry point


The entry point is main.go
...
 func main() {
...
  #cim is a Nat server, and the real application is a Nat client, and need real application subscribe to CIM.

   s, err := server.NewServer(&serveropts)
...
  #cim is a K8S client
   kubernetes.KubeConnect, err = kubernetes.NewKubeConfig()
...
  #cim is prometheus client if CimConfig's PromEnabled is True, counter likes cim_config_push_failure_total and cim_config_push_success_total.
   if err := promutil.Init(); err != nil {
...
  #cim is a kafka client, for publish topics
   connection.ConnKafka()
...
  #cim is a etcd client, for watch configChange
   _, etcd_err := connection.EtcdConnect()

...

  #cim is a Logger, it can sent log to KAFKA
  agents.KafkaLogConn, _ = agents.IntialiseKafkaProducer()

Scenario 1 Configuration management


Cim watch etcd's change, and then handle real application to change config
main.go main -- > agents/config_manage.go WatchConfigChange<clientv3.Event> --> handleEtcdUpdate -> handleAppConfigUpdate -> Post real Application's /updateConfig method

...
  # the config changed is json format
  response, err := httpUtility.HttpPost("http://localhost:"+appPort+"/updateConfig", "application/json", jsonValue)
  if err != nil {
    logs.LogConf.Error("The HTTP request failed with error", err.Error())
    _, etcdErr := connection.ConnectEtcd.Put(context.Background(), commitConfigKeys, "-1")
    if etcdErr != nil {
      logs.LogConf.Error("error while updating etcd", etcdErr)
    }
    err = promutil.CounterAdd("cim_config_push_failure_total", 1, map[string]string{"pod": types.PodID})
    if err != nil {
      log.Println("error while adding values in counter for total push config failure", err)
    }
    return
  }

  err = promutil.CounterAdd("cim_config_push_success_total", 1, map[string]string{"pod": types.PodID})
...

Scenario 2 DayOneConfig


main.go -> agents/config_manager.go WatchDayOneConfig -> configLoader
handle real applicaton's loadConfig interface, if response has error, then restart the pod to enforce reload the configs

Can't find any loadConfig implement in XGVela's seedcode.
...
  response, err := httpUtility.HttpPost("http://localhost:"+appPort+"/api/v1/_operations/loadConfig", "application/json", data)
  if err != nil {
    logs.LogConf.Error("The HTTP request failed with error", err.Error())
    notify_util.RaiseEvent("LoadConfigRequestFailure", []string{"Load_Config_Request"}, []string{"FAIL"}, []string{"Error", "Action"}, []string{"Config load failed", "Load Changes Manually"})
    //restart pod
    kubernetes.KubeConnect.DeletePod(types.PodID, types.Namespace)
    return err
  }
...

Scenario 3 Log Forward


Real application subscribe the cim's, and send message to NAT Server with TOPIC not in ["CONFIG", "TEST"]
agents/subscriber_interface.go -> agents/log_agent.go HandleMessages
...
  s.containerID = string(logmsg.ContainerId())
  s.containerName = string(logmsg.ContainerName())

  fileName := s.PodID + "_" + s.Namespace + "_" + s.containerName + "-" + s.containerID + ".log"
  ljFilename = fileName

  switch types.CimConfigObj.CimConfig.Lmaas.LoggingMode {
  case "TCP":
    LogModeTCP(logmsg.Payload(), fileName, s)
  case "KAFKA":
    LogModeKafka(logmsg.Payload(), fileName, s)
  case "STDOUT":
    LogModeSTDOUT(logmsg.Payload(), s.containerName)
  case "FILEANDSTDOUT":
    LogModeSTDOUT(logmsg.Payload(), s.containerName)
    writeBuffer(append(logmsg.Payload(), '\n'), fileName, s)
  default:
    writeBuffer(append(logmsg.Payload(), '\n'), fileName, s)
...
  func LogModeKafka(logMsg []byte, fileName string, s *NatsSubscriber) {
    if KafkaLogConn == nil {
      writeBuffer(append(logMsg, '\n'), fileName, s)
      return
    }
    if !KafkaLogConn.writeToKafka(logMsg, s.containerName) {
      writeBuffer(append(logMsg, '\n'), fileName, s)
    }
  }
...

Scenario 4 Event Forward


Real application subscribe the cim, and send message to NAT Server with TOPIC in ["EVENT"]
agents/handle_agents.go -> agents/event_agent.go HandleMesages -> Push
...
  //Push messages to kafka
  func Push(ctx context.Context, key, value []byte, k *KafkaPublisher, header []kafka.Header) (err error) {
    message := kafka.Message{
      Key: key,
      Value: value,
      Time: time.Now(),
      Headers: header,
    }
    return k.Writer.WriteMessages(ctx, message)
  }
...