cim
XGVela cim is the XGVela Telco PaaS Container Interface Module(CIM).
Cim is used as a sidecar for other service, such as cmaas, tmaas.
The goal is to decouple applications from infrastructure, such as message queues and make application development less difficult.
Cim is a Go program.
Cim listen etcd's change and then handle real application to modify it's config
Cim provide standard NAT interface for loging & event, cim forward it to KAFKA or other backends
Entry point
The entry point is main.go
...
func main() {
...
#cim is a Nat server, and the real application is a Nat client, and need real application subscribe to CIM.
s, err := server.NewServer(&serveropts)
...
#cim is a K8S client
kubernetes.KubeConnect, err = kubernetes.NewKubeConfig()
...
#cim is prometheus client if CimConfig's PromEnabled is True, counter likes cim_config_push_failure_total and cim_config_push_success_total.
if err := promutil.Init(); err != nil {
...
#cim is a kafka client, for publish topics
connection.ConnKafka()
...
#cim is a etcd client, for watch configChange
_, etcd_err := connection.EtcdConnect()
...
#cim is a Logger, it can sent log to KAFKA
agents.KafkaLogConn, _ = agents.IntialiseKafkaProducer()
Scenario 1 Configuration management
Cim watch etcd's change, and then handle real application to change config
main.go main -- > agents/config_manage.go WatchConfigChange<clientv3.Event> --> handleEtcdUpdate -> handleAppConfigUpdate -> Post real Application's /updateConfig method
...
# the config changed is json format
response, err := httpUtility.HttpPost("http://localhost:"+appPort+"/updateConfig", "application/json", jsonValue)
if err != nil {
logs.LogConf.Error("The HTTP request failed with error", err.Error())
_, etcdErr := connection.ConnectEtcd.Put(context.Background(), commitConfigKeys, "-1")
if etcdErr != nil {
logs.LogConf.Error("error while updating etcd", etcdErr)
}
err = promutil.CounterAdd("cim_config_push_failure_total", 1, map[string]string{"pod": types.PodID})
if err != nil {
log.Println("error while adding values in counter for total push config failure", err)
}
return
}
err = promutil.CounterAdd("cim_config_push_success_total", 1, map[string]string{"pod": types.PodID})
...
Scenario 2 DayOneConfig
main.go -> agents/config_manager.go WatchDayOneConfig -> configLoader
handle real applicaton's loadConfig interface, if response has error, then restart the pod to enforce reload the configs
Can't find any loadConfig implement in XGVela's seedcode.
...
response, err := httpUtility.HttpPost("http://localhost:"+appPort+"/api/v1/_operations/loadConfig", "application/json", data)
if err != nil {
logs.LogConf.Error("The HTTP request failed with error", err.Error())
notify_util.RaiseEvent("LoadConfigRequestFailure", []string{"Load_Config_Request"}, []string{"FAIL"}, []string{"Error", "Action"}, []string{"Config load failed", "Load Changes Manually"})
//restart pod
kubernetes.KubeConnect.DeletePod(types.PodID, types.Namespace)
return err
}
...
Scenario 3 Log Forward
Real application subscribe the cim's, and send message to NAT Server with TOPIC not in ["CONFIG", "TEST"]
agents/subscriber_interface.go -> agents/log_agent.go HandleMessages
...
s.containerID = string(logmsg.ContainerId())
s.containerName = string(logmsg.ContainerName())
fileName := s.PodID + "_" + s.Namespace + "_" + s.containerName + "-" + s.containerID + ".log"
ljFilename = fileName
switch types.CimConfigObj.CimConfig.Lmaas.LoggingMode {
case "TCP":
LogModeTCP(logmsg.Payload(), fileName, s)
case "KAFKA":
LogModeKafka(logmsg.Payload(), fileName, s)
case "STDOUT":
LogModeSTDOUT(logmsg.Payload(), s.containerName)
case "FILEANDSTDOUT":
LogModeSTDOUT(logmsg.Payload(), s.containerName)
writeBuffer(append(logmsg.Payload(), '\n'), fileName, s)
default:
writeBuffer(append(logmsg.Payload(), '\n'), fileName, s)
...
func LogModeKafka(logMsg []byte, fileName string, s *NatsSubscriber) {
if KafkaLogConn == nil {
writeBuffer(append(logMsg, '\n'), fileName, s)
return
}
if !KafkaLogConn.writeToKafka(logMsg, s.containerName) {
writeBuffer(append(logMsg, '\n'), fileName, s)
}
}
...
Scenario 4 Event Forward
Real application subscribe the cim, and send message to NAT Server with TOPIC in ["EVENT"]
agents/handle_agents.go -> agents/event_agent.go HandleMesages -> Push
...
//Push messages to kafka
func Push(ctx context.Context, key, value []byte, k *KafkaPublisher, header []kafka.Header) (err error) {
message := kafka.Message{
Key: key,
Value: value,
Time: time.Now(),
Headers: header,
}
return k.Writer.WriteMessages(ctx, message)
}
...