Loading...
Kafka handmade analytics
Author Cloudapp
E.G.

Next.js 14 / Upstash Kafka - Handmade High-Performance Analytics

July 5, 2024
Table of Contents

Upstash Kafka is an ideal solution for website analytics due to its serverless architecture, which eliminates the need to manage infrastructure and reduces operational complexity and costs. Its pay-as-you-go pricing ensures scalability, making it suitable for both small and large-scale applications. Upstash Kafka offers a high throughput and low latency, which is crucial for real-time data processing and analytics.

Integration

Its integration capabilities with various data sources and analytics tools streamline the data pipeline, enabling seamless ingestion, processing, and analysis. Additionally, Upstash Kafka ensures data durability and reliability with its distributed architecture, ensuring that analytics data is consistently available. The ease of use, coupled with robust security features, makes Upstash Kafka a reliable and efficient choice for handling large volumes of website analytics data, providing actionable insights to drive business decisions.

5 Key Capabilities of Kafka

  1. Real-Time Data Processing: Kafka excels in real-time data processing, enabling businesses to perform analytics on live data streams, which is essential for timely insights and decision-making.

  2. Scalability and Fault Tolerance: Kafka’s distributed architecture allows for seamless scalability and robust fault tolerance, ensuring reliable data ingestion and analytics even as data volumes grow.

  3. High Throughput and Low Latency: Kafka is designed for high throughput and low latency, making it ideal for analytics applications that require quick data ingestion and immediate processing.

  4. Integration with Analytics Tools: Kafka integrates seamlessly with various analytics tools and platforms, such as Apache Spark and Apache Flink, enhancing its capabilities for comprehensive data analysis.

  5. Efficient Data Storage and Retrieval: Kafka’s efficient data storage and retrieval mechanisms support large-scale data analytics, enabling organizations to store, access, and analyze massive datasets with ease.

By leveraging these advantages, Kafka empowers businesses to enhance their analytics capabilities, leading to more informed decision-making and improved operational efficiency.

Here is the GitHub repo with the entire code, where you can see all the details regarding my Kafka Integration.

Used Stack

I will start with my default stack:

  • Next.js 14 as the web framework, and I will use the provided middleware edge function

  • Upstash Kafka, in combination with the NPM Package @upstash/kafka

  • Neon.tech Serverless Postgres

  • Vercel for hosting

One new NPM Package

npm install @upstash/kafka

Get Started with a New Kafka Cluster

Setting up a new Kafka cluster is relatively easy. Login to Upstash:

Login and create an Upstash Kafka Cluster

On the top, select “Kafka” and below on the right, “+ Create Cluster”. Select a Cluster name, a region, and the Replica type, and you are done.

Kafka-Upstash-create-cluster
Kafka-Upstash-create-cluster

In the second step, create a “Topic,” which is the place where your messages will be stored and where you select the “Cleanup Policy” and the “Retention Settings”.

Kafka-Upstash-create-topic
Kafka-Upstash-create-topic

Getting the needed data for the .env.local

Kafka-Upstash-settings
Kafka-Upstash-settings

Adding Env-Variables in .env.local

Use the three values copied before for the local .env file

# Upstash Kafka
KAFKA_URL=https://xxx-kafka.upstash.io
KAFKA_USERNAME=xxxx
KAFKA_PASSWORD=xxxx

Fetch Request Details in middleware.ts

As a first step, I import the “Kafka” package into the middleware.ts file under src and add “NextFetchEvent” from next/server as well.

import { Kafka } from "@upstash/kafka"; 
import type { NextFetchEvent, NextRequest } from "next/server";

Then, I define the connection with the prepared .env variables.

const kafka = new Kafka({
  url: process.env.KAFKA_URL || "",
  username: process.env.KAFKA_USERNAME || "",
  password: process.env.KAFKA_PASSWORD || "",
});

The next step is to add the event to the function definition.

export async function middleware(request: NextRequest, event: NextFetchEvent) 

Now, I am ready to get the details from the “request” object and push them to our Kafka Cluster. The first part with the const newDate = xx is used to set the “created_at” attribute in the object messagekafka because I need to add two hours (I am living in CET ;-))

Below the creation of the “messagekafka” object, I start the “producer” and define the “topic” (remember the name that you set up in upstash).

On the last line, I push the data to the cluster.

const newDate = new Date();
  newDate.setHours(newDate.getHours() + 2);

  const messagekafka = {
    country: request.geo?.country,
    city: request.geo?.city,
    region: request.geo?.region,
    pathname: request.nextUrl.pathname,
    url: request.url,
    ip: request.headers.get("x-real-ip"),
    exip: request.ip,
    nexturl: request.headers.get("next-url"),
    mobile: request.headers.get("sec-ch-ua-mobile"),
    platform: request.headers.get("sec-ch-ua-platform"),
    useragent: request.headers.get("user-agent"),
    referer: request.headers.get("referer"),
    fetchsite: request.headers.get("sec-fetch-site"),
    created_at: newDate.toISOString(),
  };

  const p = kafka.producer();
  const topic = "tracking";

  event.waitUntil(p.produce(topic, JSON.stringify(messagekafka)));

Complete Middleware.ts file

import { NextResponse } from "next/server";
import type { NextFetchEvent, NextRequest } from "next/server";
import { fallbackLng, locales } from "@/app/i18n/settings";
import { Kafka } from "@upstash/kafka";

const apikey = process.env.API_KEY;

const kafka = new Kafka({
  url: process.env.KAFKA_URL || "",
  username: process.env.KAFKA_USERNAME || "",
  password: process.env.KAFKA_PASSWORD || "",
});

export async function middleware(request: NextRequest, event: NextFetchEvent) {
  //Middleware to track user data
  const newDate = new Date();
  newDate.setHours(newDate.getHours() + 2);

  const messagekafka = {
    country: request.geo?.country,
    city: request.geo?.city,
    region: request.geo?.region,
    pathname: request.nextUrl.pathname,
    url: request.url,
    ip: request.headers.get("x-real-ip"),
    exip: request.ip,
    nexturl: request.headers.get("next-url"),
    mobile: request.headers.get("sec-ch-ua-mobile"),
    platform: request.headers.get("sec-ch-ua-platform"),
    useragent: request.headers.get("user-agent"),
    referer: request.headers.get("referer"),
    fetchsite: request.headers.get("sec-fetch-site"),
    created_at: newDate.toISOString(),
  };

  const p = kafka.producer();
  const topic = "tracking";

  event.waitUntil(p.produce(topic, JSON.stringify(messagekafka)));

  // Check if there is any supported locale in the pathname
  const pathname = request.nextUrl.pathname;

  // Check if the default locale is in the pathname
  if (
    pathname.startsWith(`/${fallbackLng}/`) ||
    pathname === `/${fallbackLng}`
  ) {
    return NextResponse.redirect(
      new URL(
        pathname.replace(
          `/${fallbackLng}`,
          pathname === `/${fallbackLng}` ? "/" : ""
        ),
        request.url
      ),
      301
    );
  }

  // Check if the pathname is missing any locale
  const pathnameIsMissingLocale = locales.every(
    (locale) => !pathname.startsWith(`/${locale}/`) && pathname !== `/${locale}`
  );

  if (pathnameIsMissingLocale) {
    const RewriteUrl = request.nextUrl;
    RewriteUrl.pathname = `/${fallbackLng}${pathname}`;

    return NextResponse.rewrite(new URL(RewriteUrl, request.url));
  }
}

export const config = {
  /*
   * Match all request paths except for the ones starting with:
   * - api (API routes)
   * - _next/static (static files)
   * - _next/image (image optimization files)
   * - favicon.ico (favicon file)
   */
  matcher: [
    "/((?!api|sitemap.xml|robots.txt|_next/static|_next/image|favicons|images|favicon.ico).*)",
  ],
};

Startup Dev Environment

Let’s test it with the command

npm run dev
Open your browser and go to the link http://localhost:3000, and you will see messages poping up in your cluster.
Kafka-Upstash-messages
Kafka-Upstash-messages

Adding Table to Prisma Schema

The Kafka Connection is up and running. Now, I will create the schema for the new Postgres table in Prisma.

model tracking {
  id         Int       @id @default(autoincrement())
  country    String?
  city       String?
  region     String?
  url        String?
  nexturl    String?
  ip         String?
  pathname   String?
  mobile     String?
  platform   String?
  useragent  String?
  referer    String?
  fetchsite  String?
  created_at DateTime?
  addedOn    DateTime  @default(now())
}

After saving the schema.prisma file under prisma run

npx prisma db push

Now, you should see the table in the DB schema on neon.tech console.

API Route for reading Kafka messages

Now, I am creating a new API Route for my next Project.

import { NextRequest, NextResponse } from "next/server";
import { Kafka } from "@upstash/kafka";
export const dynamic = "force-dynamic";

const apikey = process.env.API_KEY;

const kafka = new Kafka({
  url: process.env.KAFKA_URL || "",
  username: process.env.KAFKA_USERNAME || "",
  password: process.env.KAFKA_PASSWORD || "",
});

async function sendToPrisma(message: any) {
  const response = await fetch(
    `${process.env.NEXT_PUBLIC_BASE_URL}/api/user/tracking`,
    {
      method: "POST",
      body: JSON.stringify(message),
      headers: new Headers({
        "Content-Type": "application/json" || "",
        "x-api-key": apikey || "",
      }),
    }
  );

  const data = await response.json();
  const returnData = data?.data;

  if (response.ok) {
    return NextResponse.json(
      {
        returnData,
      },
      { status: 200 }
    );
  } else {
    return NextResponse.json(
      { error: "Failed to Post Tracking Data" },
      { status: 500 }
    );
  }
}

export async function POST(request: NextRequest) {
  if (request.headers.get("x-api-key") !== apikey) {
    return new NextResponse(
      JSON.stringify({ status: "fail", message: "You are not authorized" }),
      { status: 401 }
    );
  }

  const c = kafka.consumer();

  const messages = await c.consume({
    consumerGroupId: "trackingGroup",
    instanceId: "trackingInstance",
    topics: ["tracking"],
    autoOffsetReset: "earliest",
  });

  messages.forEach(async (message) => {
    const content = message.value;

    const responsePrisma = await sendToPrisma(JSON.parse(content));
    const prismadata = await responsePrisma.json();
    // const prismareturn = prismadata?.returnData;

    // console.log("prismadata", prismareturn);
  });

  if (messages !== null) {
    return NextResponse.json({ messages: messages.length || 0 });
  } else {
    return NextResponse.json(
      { error: "Failed to fetch messages from Kafka Cluster" },
      { status: 500 }
    );
  }
}

Here, we don’t define a Kafka Producer, as we did in the middleware.ts, but we define a Kafka.consumer to read the messages of a topic. For the “consumerGroupID” and “instanceId” you can specify a random name. It’s used for the offset, which means that if there are more consumers, every consumer should have its own “consumerGroupId” so that it can use its own “offset” when fetching the messages.

const c = kafka.consumer();

  const messages = await c.consume({
    consumerGroupId: "trackingGroup",
    instanceId: "trackingInstance",
    topics: ["tracking"],
    autoOffsetReset: "earliest",
  });

API Route for sending data to Postgres

As you probably have seen, I am using the second API Route to send the data to Postgres via Prisma. (src/app/api/user/tracking)

import prisma from "@/lib/prisma";
import { NextResponse } from "next/server";

const apikey = process.env.API_KEY;

export async function POST(req: Request) {
  if (req.headers.get("x-api-key") !== apikey) {
    return new NextResponse(
      JSON.stringify({ status: "fail", message: "You are not authorized" }),
      { status: 401 }
    );
  }

  const {
    country,
    city,
    region,
    pathname,
    url,
    nexturl,
    ip,
    mobile,
    platform,
    useragent,
    referer,
    fetchsite,
    created_at,
  } = await req.json();

  let data = await prisma.tracking.create({
    data: {
      country: country,
      city: city,
      region: region,
      pathname: pathname,
      url: url,
      nexturl: nexturl,
      ip: ip,
      mobile: mobile,
      platform: platform,
      useragent: useragent,
      referer: referer,
      fetchsite: fetchsite,
      created_at: created_at,
    },
  });

  return NextResponse.json({
    data,
  });
}

Calling the endpoint

If you now do a POST call to the endpoint:

http://localhost:3000/api/startConsumer

You will get the processed number as response and should see the data in your Postgres Table. Don’t forget to pass the API-Key as header “x-api-key”

Neon-Console-data
Neon-Console-data

In the following stories, we will go ahead and automate the import to Postgres via Vercel Cron and build an Analytics Dashboard step by step.

Cloudapp-dev, and before you leave us

Thank you for reading until the end. Before you go:

Please consider clapping and following the writer! 👏 on our Medium Account

Or follow us on twitter -> Cloudapp.dev

Related articles