Uh… hay alguien aqui? Bueno, al menos soy consistente (no escribiendo). Bienvenidos de vuelta a otro post donde hablo un poco de los problemas que me voy encontrando y de como los voy solucionando.

Hoy os traigo un problema que me encontre probando una solucion local para probar enviar eventos desde un bus de Kafka a un emulador de Google Pub/Sub con el minimo dolor posible.

Empecemos por lo basico:

  • Apache Kafka: Tecnologia de Apache que consiste en una cola de mensajes asincrona. Tienes Producers, que envian mensajes a una cola o Topic, y un Consumer, que lee de manera ordenada y asincrona el Topic. El Topic esta ordenado por particiones y offsets y hay algunas consideraciones mas, pero esto es lo fundamental.

  • Kafka Connect: Una herramienta para enviar datos desde y hacia Kafka desde sistemas externos. Hay versiones opensource pero yo he trabajado solo con la solucion de Confluence, que es quien comercializa soluciones de Kafka para empresas (dando ACLs y cositas varias). Funciona con un motor principal al que se le anaden microaplicaciones hechas en Java que hacen tareas en concreto.

  • Google Pub/Sub: Otra tecnologia de cola de mensajes asincrona. Tienes Publishers que envian informacion a un Topic y un Subscriber que lee mensajes del Topic. Parecido, eh? Hay mas diferencias pero honestamente no controlo mucho sobre ellas ni son importantes para este post.

El plan

Queremos conectar los 3 sistemas, y ademas en local porque somos un poco masocas. Yo ya tengo configurado un entorno en Docker que me levanta un broker de Kafka y un Kafka Connect (algun dia, por 2030, os hablare de mi entorno de WSL + Docker sin Docker Desktop porque es el orgullo de mis amores). Por lo tanto, nos hace falta montar algo para emular un Pub/Sub. Ademas, necesitamos alguna manera de poder interaccionar con el sistema emulado.

Con diferencia de la solucion de Apache, Pub/Sub esta pensado para ser puramente cloud, por lo que no es tan sencillo como montarse un on-promise en local y ya. Hay alguien en Google que me tiene aprecio y ha publicado amablemente un emulador para el Pub/Sub integrado en medio de su suite de utilidades.

Como entendereis, como loco de Docker, paso de instalar a mano y me voy a buscar la vida para montarme una imagen con las utilidades. Afortunadamente, Google nos proporciona la imagen ya hecha.

Por otro lado, el cliente consiste en unas utilidades que dejo Google en un repo de GitHub para interactuar con el emulador. Samaritanos tambien han hecho una imagen de estas utilidades, pero con el ligero detalle de que estan bastante desactualizadas, asi que tuve que hacerme un Dockerfile para ponerlo todo un poco al dia.

Docker Compose

Aqui teneis lo que hace falta anadir a un docker-compose.yml:

pubsub-client:
  image: pubsub-python-client:1.0
  container_name: pubsub-client
  links:
    - "pubsub-emulator"
  environment:
    - "PUBSUB_PROJECT_ID=test-project"
    - "PUBSUB_EMULATOR_HOST=pubsub-emulator:8538"
  #command: publisher.py test-project create testTopic
  tty: true
 
pubsub-emulator:
  image: gcr.io/google.com/cloudsdktool/google-cloud-cli:latest
  container_name: pubsub-emulator
  ports:
    - "8538:8538"
  command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8538

Dockerfile para el cliente

Y aqui el Dockerfile para el pubsub-python-client:1.0:

FROM nightfury1204/alpine-curl:latest
RUN /bin/sh
ENV ALPINE_VERSION=3.7
ENV PACKAGES="dumb-init musl libc6-compat linux-headers build-base bash git ca-certificates python3 python3-dev"
RUN /bin/sh -c echo \
  && echo "http://dl-cdn.alpinelinux.org/alpine/v$ALPINE_VERSION/testing" > /etc/apk/repositories \
  && echo "http://dl-cdn.alpinelinux.org/alpine/v$ALPINE_VERSION/community" >> /etc/apk/repositories \
  && echo "http://dl-cdn.alpinelinux.org/alpine/v$ALPINE_VERSION/main" >> /etc/apk/repositories \
  && apk add --no-cache $PACKAGES || \
    (sed -i -e 's/dl-cdn/dl-4/g' /etc/apk/repositories && apk add --no-cache $PACKAGES) \
  && echo "http://dl-cdn.alpinelinux.org/alpine/v$ALPINE_VERSION/main/" > /etc/apk/repositories \
  && if [[ ! -e /usr/bin/python ]]; then ln -sf /usr/bin/python3 /usr/bin/python; fi \
  && if [[ ! -e /usr/bin/python-config ]]; then ln -sf /usr/bin/python3-config /usr/bin/python-config; fi \
  && if [[ ! -e /usr/bin/pydoc ]]; then ln -sf /usr/bin/pydoc3 /usr/bin/pydoc; fi \
  && if [[ ! -e /usr/bin/easy_install ]]; then ln -sf $(ls /usr/bin/easy_install*) /usr/bin/easy_install; fi \
  && easy_install pip \
  && pip install --upgrade pip \
  && if [[ ! -e /usr/bin/pip ]]; then ln -sf /usr/bin/pip3 /usr/bin/pip; fi
ENTRYPOINT ["/usr/bin/dumb-init"]
RUN "python"
RUN apk add git
RUN git clone https://github.com/googleapis/python-pubsub
WORKDIR /python-pubsub/
RUN git checkout tags/v2.12.1 -b current
WORKDIR /python-pubsub/samples/snippets
RUN pwd
RUN ls
RUN pip install -r requirements.txt
ENTRYPOINT ["python"]

El problema

Okay, pues listo, no? Levantamos el bus, el Connect, el cliente y el emulador, creamos topics en los dos buses, enchufamos eventos al bus y creamos un conector para enviarlos a Pub/Sub.

Pues no va.

Resulta que el emulador es bastante especialito con como quiere recibir los eventos, y el conector tambien se las trae:

  1. El conector de base no admite el hecho de que no haya autentificacion (lo cual hace falta para enviar eventos al emulador)
  2. El emulador hace un handshake SSL incompatible con el sistema cloud (what. the. fuck.) por lo que tienes que establecer un canal de comunicaciones entre el cliente y el emulador sin cifrar, lo cual obviamente tampoco soporta el conector

Esta todo descrito en la documentacion de Google

Documentacion del emulador

La solucion

Pues toca abrir el conector y tocar estas cosillas. Afortunadamente para vosotros, ya lo he hecho (al menos en el caso de un conector Kafka PubSub) y lo teneis disponible en este repo de GitHub.

// ...
 
gcpCredentialsProvider = NoCredentialsProvider.create();
 
// ...
 
private void createPublisher() {
  ProjectTopicName fullTopic = ProjectTopicName.of(cpsProject, cpsTopic);
  ManagedChannel channel = ManagedChannelBuilder
    .forTarget(cpsEndpoint)
    .usePlaintext()
    .build();
  TransportChannelProvider channelProvider = FixedTransportChannelProvider
    .create(GrpcTransportChannel.create(channel));
 
  com.google.cloud.pubsub.v1.Publisher.Builder builder =
    Publisher.newBuilder(fullTopic)
      .setChannelProvider(channelProvider)
      .setCredentialsProvider(gcpCredentialsProvider);
  try {
    publisher = builder.build();
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}

Recomplilas el conector, lo pones en el Connect, reintentas y voila! Eventos de Kafka a PubSub.

Afortunadamente, los problemas del emulador no los sufres en un entorno real, porque el conector se conecta sin problema a Google, suponiendo que no tengas mucho firewall dando problemas por en medio (lo cual puede ser un problemilla, ya que el conector no admite de manera nativa ningun tipo de conexion por proxy, asi que te la tienes que picar a manini).

Hasta aqui mi historia de hoy. Nos vemos quien sabe cuando para hablaros de otras cosas totalmente irrisorias que me asolan en el curro.

Chao!