Skip to end of banner
Go to start of banner

cim

Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Current »

XGVela cim is the XGVela Telco PaaS Container Interface Module(CIM).

Cim is used as a sidecar for other service, such as cmaas, tmaas. 

The goal is to decouple applications from infrastructure, such as message queues and make application development less difficult.

Cim is a Go program.  

  1. Cim listen etcd's change and then handle real application to modify it's config

  2. Cim provide standard NAT interface for loging & event, cim forward it to KAFKA or other backends

Entry point


The entry point is main.go
...
109 func main() {
...
#cim is a Nat server, and the real application is a Nat client, and need real application subscribe to CIM.
126 s, err := server.NewServer(&serveropts)
...
#cim is a K8S client
145 kubernetes.KubeConnect, err = kubernetes.NewKubeConfig()
...
#cim is prometheus client if CimConfig's PromEnabled is True, counter likes cim_config_push_failure_total and cim_config_push_success_total.
189 if err := promutil.Init(); err != nil {
...
#cim is a kafka client, for publish topics
201 connection.ConnKafka()
...
#cim is a etcd client, for watch configChange
203 _, etcd_err := connection.EtcdConnect()

...

#cim is a Logger, it can sent log to KAFKA
217 agents.KafkaLogConn, _ = agents.IntialiseKafkaProducer()

Scenario 1 Configuration management


Cim watch etcd's change, and then handle real application to change config
main.go main -- > agents/config_manage.go WatchConfigChange<clientv3.Event> --> handleEtcdUpdate -> handleAppConfigUpdate -> Post real Application's /updateConfig method

...
# the config changed is json format
response, err := httpUtility.HttpPost("http://localhost:"+appPort+"/updateConfig", "application/json", jsonValue)
if err != nil {
logs.LogConf.Error("The HTTP request failed with error", err.Error())
_, etcdErr := connection.ConnectEtcd.Put(context.Background(), commitConfigKeys, "-1")
if etcdErr != nil {
logs.LogConf.Error("error while updating etcd", etcdErr)
}
err = promutil.CounterAdd("cim_config_push_failure_total", 1, map[string]string{"pod": types.PodID})
if err != nil {
log.Println("error while adding values in counter for total push config failure", err)
}
return
}

err = promutil.CounterAdd("cim_config_push_success_total", 1, map[string]string{"pod": types.PodID})
...

Scenario 2 DayOneConfig


main.go -> agents/config_manager.go WatchDayOneConfig -> configLoader
handle real applicaton's loadConfig interface, if response has error, then restart the pod to enforce reload the configs

Can't find any loadConfig implement in XGVela's seedcode.
...
response, err := httpUtility.HttpPost("http://localhost:"+appPort+"/api/v1/_operations/loadConfig", "application/json", data)
if err != nil {
logs.LogConf.Error("The HTTP request failed with error", err.Error())
notify_util.RaiseEvent("LoadConfigRequestFailure", []string{"Load_Config_Request"}, []string{"FAIL"}, []string{"Error", "Action"}, []string{"Config load failed", "Load Changes Manually"})
//restart pod
kubernetes.KubeConnect.DeletePod(types.PodID, types.Namespace)
return err
}
...

Scenario 3 Log Forward


Real application subscribe the cim's, and send message to NAT Server with TOPIC not in ["CONFIG", "TEST"]
agents/subscriber_interface.go -> agents/log_agent.go HandleMessages
...
s.containerID = string(logmsg.ContainerId())
s.containerName = string(logmsg.ContainerName())

fileName := s.PodID + "_" + s.Namespace + "_" + s.containerName + "-" + s.containerID + ".log"
ljFilename = fileName

switch types.CimConfigObj.CimConfig.Lmaas.LoggingMode {
case "TCP":
LogModeTCP(logmsg.Payload(), fileName, s)
case "KAFKA":
LogModeKafka(logmsg.Payload(), fileName, s)
case "STDOUT":
LogModeSTDOUT(logmsg.Payload(), s.containerName)
case "FILEANDSTDOUT":
LogModeSTDOUT(logmsg.Payload(), s.containerName)
writeBuffer(append(logmsg.Payload(), '\n'), fileName, s)
default:
writeBuffer(append(logmsg.Payload(), '\n'), fileName, s)

...
func LogModeKafka(logMsg []byte, fileName string, s *NatsSubscriber) {
if KafkaLogConn == nil {
writeBuffer(append(logMsg, '\n'), fileName, s)
return
}
if !KafkaLogConn.writeToKafka(logMsg, s.containerName) {
writeBuffer(append(logMsg, '\n'), fileName, s)
}
}

...

Scenario 4 Event Forward


Real application subscribe the cim, and send message to NAT Server with TOPIC in ["EVENT"]
agents/handle_agents.go -> agents/event_agent.go HandleMesages -> Push
...
//Push messages to kafka
func Push(ctx context.Context, key, value []byte, k *KafkaPublisher, header []kafka.Header) (err error) {
message := kafka.Message{
Key: key,
Value: value,
Time: time.Now(),
Headers: header,
}
return k.Writer.WriteMessages(ctx, message)
}
...

  • No labels