Post

Apache Kafka + Kafka Connect + Emulador de Pub/Sub

Uh... ¿hay alguien aquí? Bueno, al menos soy consistente (no escribiendo). Bienvenidos de vuelta a otro post donde hablo un poco de los problemas que me voy encontrando y de cómo los voy solucionando.
Hoy os traigo un problema que me encontré probando una solución local para probar enviar eventos desde un bus de Kafka a un emulador de Google Pub/Sub con el mínimo dolor posible.

Empecemos por lo básico:

  • Apache Kafka: Tecnología de Apache que consiste en una cola de mensajes asíncrona. Tienes Producers, que envían mensajes a una cola o Topic, y un Consumer, que lee de manera ordenada y asíncrona el Topic. El Topic está ordenado por particiones y offsets y hay algunas consideraciones más, pero esto es lo fundamental.

  • Kafka Connect: Una herramienta para enviar datos desde y hacia Kafka desde sistemas externos. Hay versiones opensource pero yo he trabajado solo con la solución de Confluence, que es quien comercializa soluciones de Kafka para empresas (dando ACLs y cositas varias). Funciona con un motor principal al que se le añaden microaplicaciones hechas en Java que hacen tareas en concreto.

  • Google Pub/Sub: Otra tecnología de cola de mensajes asíncrona. Tienes Publishers que envían información a un Topic y un Subscriber que lee mensajes del Topic. Parecido, ¿eh? Hay más diferencias pero honestamente no controlo mucho sobre ellas ni son importantes para este post.

Okay, queremos conectar los 3 sistemas, y además en local porque somos un poco masocas. Yo ya tengo configurado un entorno en Docker que me levanta un broker de Kafka y un Kafka Connect (algún día, por 2030, os hablaré de mi entorno de WSL + Docker sin Docker Desktop porque es el orgullo de mis amores). Por lo tanto, nos hace falta montar algo para emular un Pub/Sub. Además, necesitamos alguna manera de poder interaccionar con el sistema emulado.

Con diferencia de la solución de Apache, Pub/Sub está pensado para ser puramente cloud, por lo que no es tan sencillo como montarse un on-promise en local y ya. Hay alguien en Google que me tiene aprecio y ha publicado amablemente un emulador para el Pub/Sub integrado en medio de su suite de utilidades.
Como entenderéis, como loco de Docker, paso de instalar a mano y me voy a buscar la vida para montarme una imagen con las utilidades. Afortunadamente, Google nos proporciona la imagen ya hecha.

Por otro lado, el cliente consiste en unas utilidades que dejó Google en un repo de GitHub para interactuar con el emulador. Samaritanos también han hecho una imagen de estas utilidades, pero con el ligero detalle de que están bastante desactualizadas, así que tuve que hacerme un Dockerfile para ponerlo todo un poco al día.

Aquí tenéis lo que hace falta añadir a un docker-compose.yml:

  pubsub-client:
    image: pubsub-python-client:1.0
    container_name: pubsub-client
    links:
      - "pubsub-emulator"
    environment:
      - "PUBSUB_PROJECT_ID=test-project"
      - "PUBSUB_EMULATOR_HOST=pubsub-emulator:8538"
    #command: publisher.py test-project create testTopic
    tty: true
  pubsub-emulator:
    image: gcr.io/google.com/cloudsdktool/google-cloud-cli:latest
    container_name: pubsub-emulator
    ports:
      - "8538:8538"
    command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8538

Y aquí el Dockerfile para el pubsub-python-client:1.0:

FROM nightfury1204/alpine-curl:latest
RUN /bin/sh
ENV ALPINE_VERSION=3.7
ENV PACKAGES="dumb-init	musl	libc6-compat	linux-headers	build-base	bash	git	ca-certificates	python3	python3-dev"
RUN /bin/sh -c echo   && echo "http://dl-cdn.alpinelinux.org/alpine/v$ALPINE_VERSION/testing" > /etc/apk/repositories   && echo "http://dl-cdn.alpinelinux.org/alpine/v$ALPINE_VERSION/community" >> /etc/apk/repositories   && echo "http://dl-cdn.alpinelinux.org/alpine/v$ALPINE_VERSION/main" >> /etc/apk/repositories   && apk add --no-cache $PACKAGES ||     (sed -i -e 's/dl-cdn/dl-4/g' /etc/apk/repositories && apk add --no-cache $PACKAGES)   && echo "http://dl-cdn.alpinelinux.org/alpine/v$ALPINE_VERSION/main/" > /etc/apk/repositories   && if [[ ! -e /usr/bin/python ]];        then ln -sf /usr/bin/python3 /usr/bin/python; fi   && if [[ ! -e /usr/bin/python-config ]]; then ln -sf /usr/bin/python3-config /usr/bin/python-config; fi   && if [[ ! -e /usr/bin/pydoc ]];         then ln -sf /usr/bin/pydoc3 /usr/bin/pydoc; fi   && if [[ ! -e /usr/bin/easy_install ]];  then ln -sf $(ls /usr/bin/easy_install*) /usr/bin/easy_install; fi   && easy_install pip   && pip install --upgrade pip   && if [[ ! -e /usr/bin/pip ]]; then ln -sf /usr/bin/pip3 /usr/bin/pip; fi
ENTRYPOINT ["/usr/bin/dumb-init"]
RUN "python"
RUN apk add git
RUN git clone https://github.com/googleapis/python-pubsub
WORKDIR /python-pubsub/
RUN git checkout tags/v2.12.1 -b current
WORKDIR /python-pubsub/samples/snippets
RUN pwd
RUN ls
RUN pip install -r requirements.txt
ENTRYPOINT ["python"]

¿Okay, pues listo, no? Levantamos el bus, el Connect, el cliente y el emulador, creamos topics en los dos buses, enchufamos eventos al bus y creamos un conector para enviarlos a Pub/Sub.
Pues no va.

Okay, ¿cuál es el problema? Resulta que el emulador es bastante especialito con cómo quiere recibir los eventos, y el conector también se las trae. Primero, el conector de base no admite el hecho de que no haya autentificación (lo cual hace falta para enviar eventos al emulador). Segundo, el emulador hace un handshake SSL incompatible con el sistema cloud (what. the. fuck.) por lo que tienes que establecer un canal de comunicaciones entre el cliente y el emulador sin cifrar, lo cual obviamente tampoco soporta el conector.
Está todo descrito en esta página:

Pues toca abrir el conector y tocar estas cosillas. Afortunadamente para vosotros, ya lo he hecho para vosotros (al menos en el caso de un conector Kafka --> PubSub) y lo tenéis disponible en este repo de GitHub.

[...]

 gcpCredentialsProvider = NoCredentialsProvider.create();

[...]

  private void createPublisher() {
    ProjectTopicName fullTopic = ProjectTopicName.of(cpsProject, cpsTopic);
	ManagedChannel channel = ManagedChannelBuilder.forTarget(cpsEndpoint).usePlaintext().build();
	TransportChannelProvider channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));

	com.google.cloud.pubsub.v1.Publisher.Builder builder =
		Publisher.newBuilder(fullTopic)
			.setChannelProvider(channelProvider)
			.setCredentialsProvider(gcpCredentialsProvider);
    try {
      publisher = builder.build();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

Recomplilas el conector, lo pones en el Connect, reintentas y ¡voilà! Eventos de Kafka a PubSub. Afortunadamente, los problemas del emulador no los sufres en un entorno real, porque el conector se conecta sin problema a Google, suponiendo que no tengas mucho firewall dando problemas por en medio (lo cual puede ser un problemilla, ya que el conector no admite de manera nativa ningún tipo de conexión por proxy, así que te la tienes que picar a manini).

Hasta aquí mi historia de hoy. Nos vemos quien sabe cuando para hablaros de otras cosas totalmente irrisorias que me asolan en el curro.
¡Chao!

This post is licensed under CC BY 4.0 by the author.