Topo-gw and Topo-engine

Part1: topo-gw
topo-gw POD contain 3 containers

  •   init container: service-init, it used to check etcd cluster & kafka cluster & zookeeper cluster's status

  •   topo-gw.jar & jre, it's the real application

  •   cim, it's handle configuration online change, notification forwarder and logging forwerder

main logic
    topo-gw is a k8s client, and it listen to the k8s PODs changes EVENT,
    when it find the EVENT's  POD annotations contains NF_ID,NF_TYPE and NF_SERVICE_ID and NF_SERVICE_TYPE
it will prepare the PodDetails and send it to Kafka, topoengine will handle it

the main logic in PodWatcher.java
---
  if (isValid(annotations)) {

    String xgvelaId = annotations.get(Constants.XGVELA_ID).asText();
    if (!xgvelaId.equals(selfXGVelaId)) {
      LOG.debug("Pod xgvelaId does not match: " + xgvelaId);
      return;
    }

    String nfName = annotations.get(Constants.NF_ID).asText();
    String nfType = annotations.get(Constants.NF_TYPE).asText();
    String nfServiceName = annotations.get(Constants.NF_SERVICE_ID).asText();
    String nfServiceType = annotations.get(Constants.NF_SERVICE_TYPE).asText();
    if (!Leader.leaderFlag.get()) {
    //For topo-gw dont return let it push to kafka even if its non-leader
    if (!nfServiceName.equalsIgnoreCase("topo-gw")) {
      return;
    }
  }
  pushToKafka(
    new PodDetails(action, podName, namespace, nfName, nfType, nfServiceName,
    nfServiceType),nfName);
----

Logging logic
topo-gw use Log4j to handle the log.debug & log.error....
log4j's config file decide the logging will send to NatsAppender, the NAT Server——cim,and then send to KAFKA

log4j2.xml
---
<Configuration packages="org.xgvela.logging">
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
<PatternLayout
pattern="[%d{yyyy-MM-dd HH:mm:ss.SSS}][%-5level][%corrId]:[%-5t] %logger{2}- %msg%n" />
</Console>
<NatsAppender name="NatsAppender">
<PatternLayout
pattern="[%d{yyyy-MM-dd HH:mm:ss.SSS}][%-5level][%corrId]:[%-5t] %logger{2}- %msg" />
</NatsAppender>
</Appenders>

<Loggers>
<Logger name="org.xgvela" level="debug" additivity="false">
<AppenderRef ref="NatsAppender" />
<AppenderRef ref="STDOUT" />
</Logger>
<Root level="error">
<AppenderRef ref="NatsAppender" />
<AppenderRef ref="STDOUT" />
</Root>
</Loggers>
</Configuration>
---
NatsAppender.java
---
  private void publishToNats(String record) {
    Connection natsConnection = NatsUtil.getConnection();
    byte[] buff = buildFlatBuff(record);
    natsConnection.publish("LOG", buff);
  }
---

Configuration online change is not enabled in topo-gw application
Notification is not enabled in topo-gw application

Part2 Topoengine
Topoengine POD contain 3 containers

  •   init container: service-init, it used to check etcd cluster & kafka cluster & zookeeper cluster's status

  •   topoengine.jar & jre, it's the real application

  •   cim, it's handle configuration online change, nofication forwarder and logging forwerder

main logic
Topoengine listien KAFKA EVENT's pre-defined TOPIC, and then update itself ManagedElemet, the Topo logic Tree

EventListener.java
---
@KafkaListener(topics = Constants.KAFKA_TOPIC, containerFactory = "kafkaListenerContainerFactory")
  private void listen(@Payload PodDetails podDetails, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
  @Header(X_CORRELATION_ID) String messageCorrelationId,@Header(KafkaHeaders.OFFSET) String offset) {

  try {
    LOG.debug("Partition: " + partition + ", KafkaMsg Offset: " + offset + ", CorrelationId: " + messageCorrelationId + ", " + podDetails.toString());
    ConstructTree.kafkaListenerLatch.await();
    manager.updateManagedElement(podDetails);
  } catch (Exception e) {
    LOG.error(e.getMessage(), e);
  }
---

TopoManager.java
---
  switch (podDetails.getAction()) {
    case ADDED:
      addToTree(podDetails, nfId, nfDn, nfServiceId, nfServiceDn, nfServiceInstanceId, nfServiceInstanceDn);
      break;
    case MODIFIED:
      updateInTree(nfServiceInstanceId, nfServiceId, nfId, podDetails);
      break;
    case DELETED:
      deleteFromTree(nfServiceInstanceId, nfServiceId, nfId, podDetails);
      break;
    case ERROR:
      break;
   default:
     break;
  }
---

Logging logic is same as Topo-gw's.

Configuration online change
  cim watch the etcd's change, and then handle  real application's /updateConfig, and real applicaton modify the config immediatly.
   real application will callback cim, and cim will change the etcd's such configuration store's status.

UpdateConfigController.java
---
@PostMapping(path = "/updateConfig")
@ResponseStatus(HttpStatus.OK)
public @ResponseBody JsonNode updateConfig(@RequestBody JsonNode requestBody) throws JsonProcessingException {
LOG.info("===> Received request for config-update: " + requestBody.toPrettyString());

UpdateRequest updateRequest = Config.mapper.treeToValue(requestBody, UpdateRequest.class);
boolean status = Config.applyPatch(Config.mapper.readTree(updateRequest.patch));

ObjectNode cimResponse = JsonNodeFactory.instance.objectNode();
cimResponse.put("change-set-key", updateRequest.changeSet);
cimResponse.put("revision", updateRequest.revision);

// default: success
cimResponse.put("status", "success");
cimResponse.put("remarks", "successfully applied the config update");

// failure case
if (!status) {
LOG.debug("Config update failed");
cimResponse.put("status", "failure");
cimResponse.put("remarks", "failed to apply the config update");
}

// publish event to CIM over NATS
Config.publishResponse(cimResponse, status);

LOG.info("<=== Configuration upto date");
return cimResponse;
}
---

Notification
  When the  topo tree has changed, topoengine will send notification by notifer, it use NotificationUtil.sendEvent,
  which send event to Nat Server——cim, and then send to KAFKA

NotificationUtil.java 
---
  Event.startEvent(builder);
  Event.addEventName(builder, name);
  Event.addEventTime(builder, eventTime);
  Event.addContainerId(builder, container);

  if (mngdObj != 0)
    Event.addManagedObject(builder, mngdObj);
  if (addInfo != 0)
    Event.addAdditionalInfo(builder, addInfo);
  if (thrInfo != 0)
    Event.addThresholdInfo(builder, thrInfo);
  if (stChDef != 0)
    Event.addStateChangeDefinition(builder, stChDef);
  if (monAttr != 0)
    Event.addMonitoredAttributes(builder, monAttr);

  Event.addSourceId(builder, srcId);
  Event.addSourceName(builder, srcName);

  int event = Event.endEvent(builder);

  builder.finish(event);

  byte[] buffer = builder.sizedByteArray();
  builder.clear();
  LOG.info("Publishing event: " + eventName + " to NATS for source: " + sourceName);
  publishEvent(buffer);
---
  private static void publishEvent(byte[] message) {
    Connection natsConnection = NatsUtil.getConnection();
    natsConnection.publish("EVENT", message);
  }
---