How to decode this string into a map?

Hello everyone,

I am using Finch to stream a request to GPT API with the “stream: true” parameter.
I have some chunks coming in that look like this :

"data: {\"id\":\"chatcmpl-7Clw4u0uTpNpvxt6IH8G6leJbFkbK\",\"object\":\"chat.completion.chunk\",\"created\":1683278556,\"model\":\"gpt-3.5-turbo-0301\",\"choices\":[{\"delta\":{\"content\":\"10\"},\"index\":0,\"finish_reason\":null}]}\n\n"

I want to decode this string to a map to retrieve the content, but if I try to use Jason.decode! I get an error : " unexpected byte at position 0" .
I’m not sure how to proceed from here : I tried cleaning the string and removing the outside quotes but it removes all the quotes. And I’m not sure why there is \n\n at the end of the chunk received?

This is probably something really easy I’m overlooking but I hope someone can help :slight_smile:

Thank you by advance

Hey @babouinette !

Looking at the string the way you have it, it seems to not be valid JSON, so Jason.decode! fails.

To make it work, you need to have a "{\"data\":....} (around the inner contents of data), or simply remove the data: part. Both ways, should give you a valid JSON payload string that you can decode.

Example:

s = "{\"data\": {\"id\":\"chatcmpl-7Clw4u0uTpNpvxt6IH8G6leJbFkbK\",\"object\":\"chat.completion.chunk\",\"created\":1683278556,\"model\":\"gpt-3.5-turbo-0301\",\"choices\":[{\"delta\":{\"content\":\"10\"},\"index\":0,\"finish_reason\":null}]}\n\n}"
Jason.decode!(s) 
# %{
#   "data" => %{
#     "choices" => [
#       %{"delta" => %{"content" => "10"}, "finish_reason" => nil, "index" => 0}
#     ],
#    "created" => 1683278556,
#    "id" => "chatcmpl-7Clw4u0uTpNpvxt6IH8G6leJbFkbK",
#    "model" => "gpt-3.5-turbo-0301",
#    "object" => "chat.completion.chunk"
#   }
# }

s = "{\"id\":\"chatcmpl-7Clw4u0uTpNpvxt6IH8G6leJbFkbK\",\"object\":\"chat.completion.chunk\",\"created\":1683278556,\"model\":\"gpt-3.5-turbo-0301\",\"choices\":[{\"delta\":{\"content\":\"10\"},\"index\":0,\"finish_reason\":null}]}\n\n"
Jason.decode!(s)
# %{
#   "choices" => [
#     %{"delta" => %{"content" => "10"}, "finish_reason" => nil, "index" => 0}
#   ],
#   "created" => 1683278556,
#   "id" => "chatcmpl-7Clw4u0uTpNpvxt6IH8G6leJbFkbK",
#   "model" => "gpt-3.5-turbo-0301",
#   "object" => "chat.completion.chunk"
# }
1 Like

Ninjad, but just remove the leading "data: " from the string and it should parse fine with Jason.decode!

data
|> String.slice(7..-1)
|> Jason.decode!()

Thank you both for your answers. I tried doing slice(6…1) but I get this :


"data: {\"id\":\"chatcmpl-7CmvKlql9qEwSZINv2mgNT2rM2AX3\",\"object\":\"chat.completion.chunk\",\"created\":1683282354,\"model\":\"gpt-3.5-turbo-0301\",\"choices\":[{\"delta\":{\"role\":\"assistant\"},\"index\":0,\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-7CmvKlql9qEwSZINv2mgNT2rM2AX3\",\"object\":\"chat.completion.chunk\",\"created\":1683282354,\"model\":\"gpt-3.5-turbo-0301\",\"choices\":[{\"delta\":{\"content\":\"1\"},\"index\":0,\"finish_reason\":null}]}\n\n"

sliced 

{"id":"chatcmpl-7CmvKlql9qEwSZINv2mgNT2rM2AX3","object":"chat.completion.chunk","created":1683282354,"model":"gpt-3.5-turbo-0301","choices":[{"delta":{"role":"assistant"},"index":0,"finish_reason":null}]}

I don’t understand why the quotes disappear around the string and why the \n\n disappears at the end also (and I get the same error “unexpected byte” but I guess it is because there are no quotes around?)

I also don’t get why it modifies the inside of the string and removes all the \

Can you post a snippet of the code where you are doing this?

1 Like

Of course :slight_smile:

 IO.puts("got new chunk ")
    IO.inspect(value)
   decoded1 = value |> String.slice(6..-1)
    IO.puts("sliced ")
   IO.puts(decoded1)

As I suspected. The difference is how you print the value IO.puts vs IO.inspect :slight_smile:

Should be fine to run Jason.decode! on your decoded1 value.

2 Likes

I just saw that some of the chunks are like this :

"data: {\"id\":\"chatcmpl-7CnHmH7nb9iDLbWlrGKUBcxtWBxGM\",\"object\":\"chat.completion.chunk\",\"created\":1683283746,\"model\":\"gpt-3.5-turbo-0301\",\"choices\":[{\"delta\":{\"role\":\"assistant\"},\"index\":0,\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-7CnHmH7nb9iDLbWlrGKUBcxtWBxGM\",\"object\":\"chat.completion.chunk\",\"created\":1683283746,\"model\":\"gpt-3.5-turbo-0301\",\"choices\":[{\"delta\":{\"content\":\"1\"},\"index\":0,\"finish_reason\":null}]}\n\n"

with \n\n and then another data: {…

Other example :

"data: {\"id\":\"chatcmpl-7CnKneAqJalga79Um9i35WHleTmyb\",\"object\":\"chat.completion.chunk\",\"created\":1683283933,\"model\":\"gpt-3.5-turbo-0301\",\"choices\":[{\"delta\":{},\"index\":0,\"finish_reason\":\"stop\"}]}\n\ndata: [DONE]\n\n"

So even when I slice it it is not good json. And you were right it was because of IO.put :slight_smile:
I will try to sort this problem I guess it comes from the accumulator function maybe.

Edit : I confirmed this is the problem, so when the chunks are formatted correctly the decode works well into a map. But when the chunks have another “data” at the end I get an error.

Hum, that part seems to be related to the way the stream works :thinking:
To be honest, I’ve never used it, but I found these two links that I believe could help you link the missing pieces. (and by your description they appear to have a very similar use case to yours)

My guess is that you’re missing the part of the code to reduce the chunks you get from the API until you get the final data: [DONE]\n\n of the stream.

1 Like

Woah thank you for the link I will read this :slight_smile: I’ll keep you posted
Edit : yes it seems to be exactly my case so I should get it working with the links, thanks a lot :slight_smile:

1 Like

OK I’m nearly there :slight_smile:

I have now :

 consume_func = fn
        {:status, value}, {_, headers, body} -> {value, headers, body}
        {:headers, value}, {status, headers, body} -> {status, headers ++ value, body}
        {:data, value}, {status, headers, body} ->
          if value == "data: [DONE]\n\n" do
            {status, headers, body}
          else
            value =
              value
              |> String.replace_prefix("data: ", "")
              |> String.replace_suffix("\n\n", "")
              |> String.split("\n\ndata: ")
              |> Enum.reduce("", fn data, acc ->
                if is_nil(data) || data == "[DONE]" do
                  acc
                else
                  value = Jason.decode!(data)
                  value = List.first(value["choices"], %{}) |> Map.get("delta", "")|> Map.get("content", "")
                  new_response = acc <> value
                  IO.inspect(new_response)

                  send(pid, {:new_response, new_response})

                  new_response
                end
              end)


            {status, headers, [value | body]}
          end

And in the liveview :

@impl true
  def handle_info({:new_response, value}, socket) do
    IO.puts("new response in liveview")
    IO.inspect(value)
   {:noreply,
    socket
      |> assign(:response, socket.assigns.response <> value)
    }
  end

But I get this response on the terminal :

""
"1"
","
" "
"2"
","
" "
"3"
","
" "
"4"
","
" "
"5"
"."
""
new response in liveview
""
new response in liveview
"1"
new response in liveview
","
new response in liveview
" "
new response in liveview
"2"
new response in liveview
","
new response in liveview
" "
new response in liveview
"3"
new response in liveview
","
new response in liveview
" "
new response in liveview
"4"
new response in liveview
","
new response in liveview
" "
new response in liveview
"5"
new response in liveview
"."
new response in liveview
""

Why do the events get all sent at the end? I would think I would have : response from accumulator → sends event → response from liveview → and again from accumulator → etc…

Why are all the processes sent at the end of the response and not at every step?

I’m not sure I understood your question now…

Is it the fact that you also get " " and "," sent to LiveView? What do the numbers printed represent? Is each one a new response coming from the stream?

Also, I’m not sure where your code is placed and what extra stuff it is doing but don’t forget that:

Multiple processes can send messages to the same target process, but that target process can only process one message at a time.

If all those messages are being handled together at the end, and they were supposed to be handled individually, you could have a situation where something in between is hanging pid process.

Sorry I will try to give a little bit more info. My liveview is calling the openAI module to start a streaming request that uses the accumulator function shown above. The function receives chunks of data from the stream and displays them correctly in the terminal (so each chunk is written in real time). But when I add the process.send to the calling liveview and try to replace the response in the assigns, the whole answer gets written at the end (instead of every time a chunk is received and a process is sent to the liveview).

I want to show each chunk in realtime in the interface but i don’t understand why the “sent to liveview” processes are all written at the end. I am asking chatgpt to count from 1 to 5 so the answer received is okay, but i want to have each chunk (so “1”, “,”,“2” etc) sent to the liveview and processed in real-time

But from your response I’m thinking the process is blocked because of the process that start the request and wait for the answer. So when the answer gets back, the process is finished and all the other processes that were sent are then processed. Is it possible to launch the request in one process and to process the received chunks in another process?

I want to have
‘’’
1
Sent to liveview :1
2
Sent to liveview: 2

‘’’

Instead of
‘’’
1
2

Sent to liveview : 1
Sent to liveview: 2

‘’’

I’m back on my PC so I will make a better question.

This is my openAI module :

defmodule VetupAI.OpenAI do
  @moduledoc """
  Handles OpenAI API request and response.
  """

  @endpoint "api.openai.com"

  @doc"""
  Returns full response from OpenAI stream given the input and options.
  Takes the following options:
  - model
  - max_tokens
  - temperature
  - consume_func, function that processes streamed response
  """
  @spec generate_completion(String.t(), list()) :: {:ok, list()} | {:error, atom()}
  def generate_completion(_input, _pid, opts \\ [])

  def generate_completion("", _pid, _opts), do: {:error, :missing_input}

  def generate_completion(input, pid, _opts) do
  body = %{
     model: "gpt-3.5-turbo",
     messages: [%{role: "user", content: input }],
     max_tokens: 3000,
     stream: true,
     temperature: 0.2
   } |> Jason.encode_to_iodata!()

    url = url("/v1/chat/completions")

    acc = {nil, [], []}

      consume_func = fn
        {:status, value}, {_, headers, body} -> {value, headers, body}
        {:headers, value}, {status, headers, body} -> {status, headers ++ value, body}
        {:data, value}, {status, headers, body} ->
          if value == "data: [DONE]\n\n" do
            {status, headers, body}
          else
            value =
              value
              |> String.replace_prefix("data: ", "")
              |> String.replace_suffix("\n\n", "")
              |> String.split("\n\ndata: ")
              |> Enum.reduce("", fn data, acc ->
                if is_nil(data) || data == "[DONE]" do
                  acc
                else
                  value = Jason.decode!(data)
                  value = List.first(value["choices"], %{}) |> Map.get("delta", "")|> Map.get("content", "")
                  new_response = acc <> value
                  IO.inspect(new_response)

                  send(pid, {:new_response, new_response})

                  new_response
                end
              end)


            {status, headers, [value | body]}
          end
      end

    Finch.build(:post, url, headers(), body)
    |> Finch.stream(VetupAI.Finch, acc, consume_func)
    #|> Finch.stream(VetupAI.Finch, acc, consume_func, [{:receive_timeout, :infinity}])
    |> handle_stream(:generate_completion)
  end

And in my liveview :

def mount(_params, _session, socket) do
    {:ok, socket
   |> assign(:response, "")
   |> assign(:loading, false)
   |> assign(:prompt, "")
  }
  end

  @impl true
  def handle_params(%{"id" => id}, _, socket) do
    {:noreply,
     socket
     |> assign(:page_title, page_title(socket.assigns.live_action))
     |> assign(:case, Cases.get_case!(id))}
  end

  @impl true
  @spec handle_event(<<_::56>>, map, map) :: {:noreply, map}
  def handle_event("analyse", params, socket) do
    part_number = case Map.has_key?(params, "part_number") do
      true -> params["part_number"]
      false -> ""
    end
    Process.send(self(), {:generate_completion, part_number}, [])

    {:noreply,
     socket
     |> assign(:loading, true)
    }
  end

  @impl true
  @spec handle_info({:new_response, binary}, %{
          :assigns => atom | %{:response => binary, optional(any) => any},
          optional(any) => any
        }) :: {:noreply, map}
  def handle_info({:generate_completion, part_number}, socket) do

    pid = self()
    {case_prompt, prompt} = VetupAI.Prompt.generatePrompt(socket, part_number)

    response = VetupAI.OpenAI.generate_completion("Count from 1 to 3", pid)
    IO.puts("final answer")
    IO.inspect(response)

    {:noreply,
    socket
      |> assign(:loading, false)
    }
  end

  @impl true
  def handle_info({:new_response, value}, socket) do
    IO.puts("new response in liveview")
    IO.inspect(value)
   {:noreply,
    socket
      |> assign(:response, socket.assigns.response <> value)
    }
  end

And I get :

""
"1"
","
" "
"2"
","
" "
"3"
"."
""
final answer
{:ok, ["1", ",", " ", "2", ",", " ", "3", ".", ""]}
new response in liveview
""
new response in liveview
"1"
new response in liveview
","
new response in liveview
" "
new response in liveview
"2"
new response in liveview
","
new response in liveview
" "
new response in liveview
"3"
new response in liveview
"."
new response in liveview
""

So I guess when I say :

    Process.send(self(), {:generate_completion, part_number}, [])

then all the processes sent to this liveview are “stored” until this process is done (which is when it receives the final response and send the {noreply} response).

Is that what is happening? How could I modify the code to work?

Well, as I suspected your “problem” is cause by this:

You are asking your LiveView process to make the request to the API, inside the handle_info({:generate_completion, part_number}, socket) and inside that, at the same time, you are sending new messages to be handled by this same process. This is the situation I talked above of an Elixir/Erlang process only being able to handle a message at a time → your process is handling the generate_completion message, and only when it finishes will handle :new_response messages.

To fix this you have several alternatives to start a new process and make “some work”. But I’ll leave it up to you to decide which one is best for your use case:

Ok perfect thank you for explaining it and confirming my last thought :slight_smile:

I was thinking about using a genserver to handle the request, so your answer is confirming my thought. So one genserver for every request, that send processes to the liveview to update the interface.

So instead of opening a process from the liveview “generate_completion”, I would launch a genserver to make the request and send processes back to the liveview, that wiould then handle them in real-time.

Does that sound correct to you?

Yeah, I think that sounds perfectly fine! (Assuming you won’t have a huge amount of requests that may spawn a huge amount of GenServers, so you don’t spam the API pretty hard)

If you want to go a bit down the exploration path, you can also try adding something like a DynamicSupervisor to have those GenServers under a supervision tree, and/or a pool of processes to have your app more resilient. (Kind of a shameless plug, but I wrote this post some time ago, that may help you in the future :sweat_smile:)

Perfect seems like a good read :slight_smile:

Thank you so much (all of you) for your help. I will try first to “export” the request in a genserver on a basic level, and maybe after will try to add a DynamicSupervisor when everything works :+1:

Thanks again everyone

1 Like