Python and Pulsar



examples/python-pulsar-cli/consumer.py
import pulsar
import time
from mytools import get_logger, topic

def receive():
    logger = get_logger('pulsar')
    logger.info('Consumer starting')
    time.sleep(20)
    logger.info('Consumer really starting')

    try:
        client = pulsar.Client('pulsar://my-pulsar:6650')
        consumer = client.subscribe(topic, 'my-subscription')
    except Exception:
        logger.exception("Consumer could not connect to pulsar")
    logger.info("Consumer connected")


    while True:
        msg = consumer.receive()
        try:
            logger.info("Received: {}: {}".format(msg.data(), msg.message_id()))
            consumer.acknowledge(msg)
        except Exception as err:
            logger.error(f"Exception {err}")


receive()

examples/python-pulsar-cli/docker-compose.yml
version: '3.7'
services:
  producer:
    build: .
    volumes:
      - .:/opt
    links:
      - pulsar
    #command: python producer.py
    command: tail -f /dev/null
  consumer:
    build: .
    volumes:
      - .:/opt
    links:
      - pulsar
    command: tail -f /dev/null
    #command: python consumer.py
  pulsar:
    image: apachepulsar/pulsar:2.5.2
    container_name: my-pulsar
    expose:
       - 8080
       - 6650
    command: >
      /bin/bash -c
      "bin/apply-config-from-env.py conf/standalone.conf
      && bin/pulsar standalone"

examples/python-pulsar-cli/Dockerfile
FROM python:3
COPY requirements.txt /opt/
RUN pip3 install -r /opt/requirements.txt

WORKDIR /opt

examples/python-pulsar-cli/input.txt
First
Second
Third
Fourth

examples/python-pulsar-cli/mytools.py
import logging
import os

def get_logger(name):
    log_file = name + '.log'
    log_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)-10s - %(message)s')

    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)

    sh = logging.StreamHandler()
    sh.setLevel(logging.INFO)
    sh.setFormatter( log_format )
    logger.addHandler(sh)

    #if os.path.exists(log_file):
    #    os.unlink(log_file)
    fh = logging.FileHandler(log_file)
    fh.setLevel(logging.INFO)
    fh.setFormatter( log_format )
    logger.addHandler(fh)

    return logger

topic = 'text'

examples/python-pulsar-cli/producer.py
import pulsar
import time
from mytools import get_logger, topic


def send():
    logger = get_logger('pulsar')
    logger.info("Producer starting")
    time.sleep(20)
    logger.info("Producer really starting")

    filename = 'input.txt'

    try:
        client = pulsar.Client('pulsar://my-pulsar:6650')
        producer = client.create_producer(topic)
    except Exception:
        logger.exception("Producer could not connect to pulsar")
    logger.info("Producer connected")

    with open(filename) as fh:
        for row in fh:
            logger.info(f"Sending {row}")
            producer.send(row.encode('utf-8'))
            time.sleep(1)

send()

examples/python-pulsar-cli/requirements.txt
pulsar-client

Run:


docker-compose up

and then check the pulsar.log file