Question regarding send_download, send_file from binary in memory

My requirement is to send_file by range, say: if people just checked first one or two block of video, or get last two blocks of video file, the big video file is from network by many small blocks of it.

I tried send_download, but can only from file or binary, no function for range send…
I also tried send_file and send_resp, but send_file map to underlying OS or maybe Cowboy, send_resp no range send.

My big file is from network maybe hundreds or even thousands blocks, I don’t want to only service some small portion but need to require all blocks over.

Something like send_file: send_file(conn, status, file, offset \ 0, length \ :all) has offset and length, instead from file, can from memory.

Any idea?

1 Like

This question becomes more important now, as the browser triggered 3 requests for the video:

    content_binary = RiakCS.get_file_from_network(file_meta)
    conn
    |> put_resp_content_type(upload.content_type)
    |> put_resp_header("accept-ranges", "bytes")
    |> Plug.Conn.send_resp(:ok, content_binary)

After I added the “accept-ranges”, “bytes” header, the video support fast forward to middle now.

Still original question, any idea?

The answer for sending from memory is using send_resp, exactly as you wrote. If you are handling ranges, then you can implement the proper support for that.

The reason send_file is its own API is only because we don’t want to load the file in memory in the first place - and operating systems have a nice API for that too which send_file builds on top of. But once it is in memory, you can use everything Plug provides to you.

3 Likes

Thank you @josevalim, yes, that actually works, right now I can serve big video smoothly.

Code here:

  def service_video_audio(conn, file_meta_riak, extracted) do
    Common.BlockCache.prepare_video_serving(file_meta_riak)

    block_hash_mapping = RiakCslite.find_block_hash_maping_by_file_meta(extracted.file_meta_common)
    first_block_hash = RiakCslite.find_block_hash_by_index(block_hash_mapping, 0)
    req_range_header = Plug.Conn.get_req_header(conn, "range")
    IO.inspect Integer.to_string(extracted.file_length)
    IO.inspect req_range_header
    IO.inspect "=================="
    res_length_header = Plug.Conn.get_resp_header(conn, "content-length")
    IO.inspect res_length_header, lable: "resp length header:"
    IO.inspect "=================="
    conn =
      conn
      |> Conn.put_resp_content_type(extracted.file_mime_type)
      |> Conn.put_resp_header("accept-ranges", "bytes")
    case has_range_header(req_range_header)  do

      true ->
        case parse_range(req_range_header) do
          {0, 1} ->
            conn
            |> Conn.put_resp_header("content-range", "bytes 0-1/#{Integer.to_string(extracted.file_length)}")
            |> Plug.Conn.send_resp(:partial_content, binary_part(Common.BlockCache.read_block_from_riak_or_redis(first_block_hash), 0, 1))
          {0, -1} ->
            content_range = case extracted.total_block_index == 0 do
              true  -> "bytes 0-#{Integer.to_string(extracted.file_length - 1)}/#{Integer.to_string(extracted.file_length)}"
              false -> "bytes 0-#{Integer.to_string(@one_mb - 1)}/#{Integer.to_string(extracted.file_length)}"
            end
            conn
            |> Conn.put_resp_header("content-range", content_range)
            |> Plug.Conn.send_resp(:partial_content, Common.BlockCache.read_block_from_riak_or_redis(first_block_hash))
          {range_start, -1} ->
            if extracted.file_length - range_start > @two_mb do
              num_in_index = div(range_start, @one_mb)
              part_send = compose_parts(extracted, block_hash_mapping,range_start, num_in_index, -1)
              conn
              |> Conn.put_resp_header("content-range", "bytes #{Integer.to_string(range_start)}-#{Integer.to_string(range_start + @one_mb - 1)}/#{Integer.to_string(extracted.file_length)}")
              |> Plug.Conn.send_resp(:partial_content, part_send)
            else
              num_in_index = div(range_start, @one_mb)
              part_send =
                cond do
                  extracted.total_block_index - num_in_index == 2 ->
                    compose_parts(extracted, block_hash_mapping,range_start, num_in_index, 2)
                  extracted.total_block_index - num_in_index == 1 ->
                    compose_parts(extracted, block_hash_mapping,range_start, num_in_index, 1)
                  extracted.total_block_index - num_in_index == 0 ->
                    compose_parts(extracted, block_hash_mapping,range_start, num_in_index, 0)
                end
              conn
              |> Conn.put_resp_header("content-range", "bytes #{Integer.to_string(range_start)}-#{Integer.to_string(extracted.file_length - 1)}/#{Integer.to_string(extracted.file_length)}")
              |> Plug.Conn.send_resp(:partial_content, part_send)
            end

          {range_start, range_end} -> # todo impl like open end
            if range_end - range_start > @two_mb do
              num_in_index = div(range_start, @one_mb)
              part_send = compose_parts(extracted, block_hash_mapping,range_start, num_in_index, -1)
              conn
              |> Conn.put_resp_header("content-range", "bytes #{Integer.to_string(range_start)}-#{Integer.to_string(range_start + @one_mb - 1)}/#{Integer.to_string(extracted.file_length)}")
              |> Plug.Conn.send_resp(:partial_content, part_send)
            else
              start_num_in_index = div(range_start, @one_mb)
              end_num_in_index = div(range_end, @one_mb)
              part_send =
                cond do
                  end_num_in_index - start_num_in_index == 2 ->
                    compose_parts(extracted, block_hash_mapping,range_start, start_num_in_index, range_end, end_num_in_index, 2)
                  end_num_in_index - start_num_in_index == 1 ->
                    compose_parts(extracted, block_hash_mapping,range_start, start_num_in_index, range_end, end_num_in_index, 1)
                  end_num_in_index - start_num_in_index == 0 ->
                    compose_parts(extracted, block_hash_mapping,range_start, start_num_in_index, range_end, end_num_in_index, 0)
                end
              conn
              |> Conn.put_resp_header("content-range", "bytes #{Integer.to_string(range_start)}-#{Integer.to_string(range_end)}/#{Integer.to_string(extracted.file_length)}")
              |> Plug.Conn.send_resp(:partial_content, part_send)

            end
        end

      _ ->
        conn
        # |> Plug.Conn.send_resp(:ok, content_binary)
            |> Conn.put_resp_header("content-range", "bytes 0-#{Integer.to_string(@one_mb - 1)}/#{Integer.to_string(extracted.file_length)}")
            |> Plug.Conn.send_resp(:partial_content, Common.BlockCache.read_block_from_riak_or_redis(first_block_hash) )
    end
  end



  defp has_range_header(req_range_header) do
    IO.inspect req_range_header
    case req_range_header do
      [_|_] -> true  # TODO think about multi ranges
      [] -> false
      _  -> false
    end

  end

  defp parse_range(req_range_header) do
    [x | _] = req_range_header
    range = x |> String.trim_leading("bytes=")
    [range_start , range_end] = String.split(range, "-")

    case range_end == "" do
      true -> {String.to_integer(range_start), -1}
      false -> {String.to_integer(range_start), String.to_integer(range_end)}

    end

  end

  defp compose_parts(extracted, block_hash_mapping,range_start, num_in_index, -1) do
    part_1 =
      RiakCslite.find_block_hash_by_index(block_hash_mapping, num_in_index)
      |> BlockCache.read_block_from_riak_or_redis()
    part_2 =
      RiakCslite.find_block_hash_by_index(block_hash_mapping, num_in_index + 1)
      |> BlockCache.read_block_from_riak_or_redis()
    start_point_in_block = rem(range_start, @one_mb)
    Task.async(fn ->
      RiakCslite.find_block_hash_by_index(block_hash_mapping, num_in_index + 2)
      |> BlockCache.read_block_from_riak_or_redis()
    end)
    binary_part(part_1 <> part_2, start_point_in_block, @one_mb)

  end

  defp compose_parts(extracted, block_hash_mapping,range_start, num_in_index, 0) do

    part_1 =
      RiakCslite.find_block_hash_by_index(block_hash_mapping, num_in_index)
      |> BlockCache.read_block_from_riak_or_redis()
    start_point_in_block = rem(range_start, @one_mb)
    binary_part(part_1, start_point_in_block, extracted.file_length - range_start)
  end

  defp compose_parts(extracted, block_hash_mapping,range_start, num_in_index, 1) do
    part_1 =
      RiakCslite.find_block_hash_by_index(block_hash_mapping, num_in_index)
      |> BlockCache.read_block_from_riak_or_redis()
    part_2 =
      RiakCslite.find_block_hash_by_index(block_hash_mapping, extracted.total_block_index)
      |> BlockCache.read_block_from_riak_or_redis()
    start_point_in_block = rem(range_start, @one_mb)
    binary_part(part_1 <> part_2, start_point_in_block, extracted.file_length - range_start)

  end

  defp compose_parts(extracted, block_hash_mapping,range_start, num_in_index, 2) do
    part_1 =
      RiakCslite.find_block_hash_by_index(block_hash_mapping, num_in_index)
      |> BlockCache.read_block_from_riak_or_redis()
    part_2 =
      RiakCslite.find_block_hash_by_index(block_hash_mapping, num_in_index + 1)
      |> BlockCache.read_block_from_riak_or_redis()
    part_3 =
      RiakCslite.find_block_hash_by_index(block_hash_mapping, extracted.total_block_index)
      |> BlockCache.read_block_from_riak_or_redis()
    start_point_in_block = rem(range_start, @one_mb)
    binary_part(part_1 <> part_2 <> part_3, start_point_in_block, extracted.file_length - range_start)
  end

  defp compose_parts(extracted, block_hash_mapping,range_start, start_num_in_index, range_end, end_num_in_index, 2)  do
    part_1 =
      RiakCslite.find_block_hash_by_index(block_hash_mapping, start_num_in_index)
      |> BlockCache.read_block_from_riak_or_redis()
    part_2 =
      RiakCslite.find_block_hash_by_index(block_hash_mapping, start_num_in_index + 1)
      |> BlockCache.read_block_from_riak_or_redis()
    part_3 =
      RiakCslite.find_block_hash_by_index(block_hash_mapping, end_num_in_index)
      |> BlockCache.read_block_from_riak_or_redis()
    start_point_in_block = rem(range_start, @one_mb)
    end_point_in_block = rem(range_end, @one_mb)
    binary_part(part_1 <> part_2 <> part_3, start_point_in_block, end_point_in_block + @two_mb - start_point_in_block + 1)
  end

  defp compose_parts(extracted, block_hash_mapping,range_start, start_num_in_index, range_end, end_num_in_index, 1)  do
    part_1 =
      RiakCslite.find_block_hash_by_index(block_hash_mapping, start_num_in_index)
      |> BlockCache.read_block_from_riak_or_redis()
    part_2 =
      RiakCslite.find_block_hash_by_index(block_hash_mapping, end_num_in_index)
      |> BlockCache.read_block_from_riak_or_redis()
    start_point_in_block = rem(range_start, @one_mb)
    end_point_in_block = rem(range_end, @one_mb)
    binary_part(part_1 <> part_2, start_point_in_block, end_point_in_block + @one_mb - start_point_in_block + 1)
  end

  defp compose_parts(extracted, block_hash_mapping,range_start, start_num_in_index, range_end, end_num_in_index, 0)  do
    part_1 =
      RiakCslite.find_block_hash_by_index(block_hash_mapping, start_num_in_index)
      |> BlockCache.read_block_from_riak_or_redis()
    start_point_in_block = rem(range_start, @one_mb)
    end_point_in_block = rem(range_end, @one_mb)
    binary_part(part_1, start_point_in_block, end_point_in_block - start_point_in_block + 1)
  end
3 Likes