Add Native streamed http functionality

This commit is contained in:
kwaroran
2024-03-09 05:53:30 +09:00
parent 633a224bb6
commit 31da44d310
4 changed files with 192 additions and 33 deletions

View File

@@ -1,5 +1,11 @@
package co.aiclient.risu;
import android.os.Bundle;
import com.getcapacitor.BridgeActivity;
public class MainActivity extends BridgeActivity {}
public class MainActivity extends BridgeActivity {
@Override
public void onCreate(Bundle savedInstanceState) {
registerPlugin(StreamedPlugin.class);
super.onCreate(savedInstanceState);
}
}

View File

@@ -1,2 +1,119 @@
package co.aiclient.risu;public class StreamedPlugin {
package co.aiclient.risu;
import android.util.Base64;
import com.getcapacitor.JSObject;
import com.getcapacitor.Plugin;
import com.getcapacitor.PluginCall;
import com.getcapacitor.PluginMethod;
import com.getcapacitor.annotation.CapacitorPlugin;
import org.json.JSONException;
import org.json.JSONObject;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Iterator;
@CapacitorPlugin(name = "streamedFetch")
public class StreamedPlugin extends Plugin {
@PluginMethod()
public void streamedFetch(PluginCall call) {
String id = call.getString("id");
String urlParam = call.getString("url");
String bodyString = call.getString("body");
JSObject headers = call.getObject("headers");
URL url = null;
try {
url = new URL(urlParam);
HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
byte[] bodyEncodedByte = bodyString.getBytes("UTF-8");
byte[] bodyByte = Base64.decode(bodyEncodedByte, Base64.DEFAULT);
Iterator<String> keys = headers.keys();
urlConnection.setRequestMethod("POST");
while(keys.hasNext()) {
String key = keys.next();
if (headers.get(key) instanceof JSONObject) {
urlConnection.setRequestProperty(key, headers.getString(key));
}
}
urlConnection.setRequestProperty("Content-Length", String.valueOf(bodyByte.length));
urlConnection.setDoInput(true);
OutputStream out = new BufferedOutputStream(urlConnection.getOutputStream());
out.write(bodyByte);
try {
InputStream in = new BufferedInputStream(urlConnection.getInputStream());
int resCode = urlConnection.getResponseCode();
JSObject resObj = new JSObject();
JSObject headerObj = new JSObject();
resObj.put("id", id);
resObj.put("type", "headers");
resObj.put("status", resCode);
int i = 0;
while (true){
String headerName = urlConnection.getHeaderFieldKey(i);
String headerValue = urlConnection.getHeaderField(i);
i++;
if(headerValue == null){
break;
}
if(headerName == null){
continue;
}
headerObj.put(headerName, headerValue);
}
resObj.put("body", headerObj);
notifyListeners("streamed_fetch", resObj);
while (true){
int ableBytes = in.available();
byte[] buf = new byte[ableBytes];
int bytesRead = in.read(buf, 0, ableBytes);
if(bytesRead == -1){
break;
}
byte[] encodedBuf = Base64.encode(buf, Base64.DEFAULT);
JSObject obj = new JSObject();
obj.put("id", id);
obj.put("body", encodedBuf);
obj.put("type", "chunk");
notifyListeners("streamed_fetch", obj);
}
JSObject endObj = new JSObject();
endObj.put("id", id);
endObj.put("type", "end");
notifyListeners("streamed_fetch", endObj);
} finally {
urlConnection.disconnect();
}
} catch (IOException e) {
JSObject obj = new JSObject();
obj.put("error", String.valueOf(e));
call.resolve(obj);
return;
} catch (JSONException e) {
JSObject obj = new JSObject();
obj.put("error", String.valueOf(e));
call.resolve(obj);
return;
}
JSObject ret = new JSObject();
ret.put("success", true);
call.resolve(ret);
}
}

View File

@@ -7,7 +7,7 @@ buildscript {
mavenCentral()
}
dependencies {
classpath 'com.android.tools.build:gradle:8.0.0'
classpath 'com.android.tools.build:gradle:8.1.3'
classpath 'com.google.gms:google-services:4.3.15'
// NOTE: Do not place your application dependencies here; they belong

View File

@@ -28,6 +28,7 @@ import * as CapFS from '@capacitor/filesystem'
import { save } from "@tauri-apps/api/dialog";
import type { RisuModule } from "../process/modules";
import { listen } from '@tauri-apps/api/event'
import { registerPlugin } from '@capacitor/core';
//@ts-ignore
export const isTauri = !!window.__TAURI__
@@ -1281,7 +1282,7 @@ export class LocalWriter{
}
let fetchIndex = 0
let tauriNativeFetchData:{[key:string]:StreamedFetchChunk[]} = {}
let nativeFetchData:{[key:string]:StreamedFetchChunk[]} = {}
interface StreamedFetchChunkData{
type:'chunk',
@@ -1302,19 +1303,39 @@ interface StreamedFetchEndData{
}
type StreamedFetchChunk = StreamedFetchChunkData|StreamedFetchHeaderData|StreamedFetchEndData
let streamedFetchListening = false
interface StreamedFetchPlugin {
streamedFetch(options: { id: string, url:string, body:string, headers:{[key:string]:string} }): Promise<{"error":string,"success":boolean}>;
addListener(eventName: 'streamed_fetch', listenerFunc: (data:StreamedFetchChunk) => void): void;
}
let streamedFetchListening = false
let capStreamedFetch:StreamedFetchPlugin|undefined
if(isTauri){
listen('streamed_fetch', (event) => {
try {
const parsed = JSON.parse(event.payload as string)
const id = parsed.id
tauriNativeFetchData[id]?.push(parsed)
nativeFetchData[id]?.push(parsed)
} catch (error) {
console.error(error)
}
}).then((v) => {
streamedFetchListening = true
})
}
if(Capacitor.isNativePlatform()){
capStreamedFetch = registerPlugin<StreamedFetchPlugin>('CapacitorHttp', CapacitorHttp)
capStreamedFetch.addListener('streamed_fetch', (data) => {
try {
nativeFetchData[data.id]?.push(data)
} catch (error) {
console.error(error)
}
})
streamedFetchListening = true
}
export async function fetchNative(url:string, arg:{
body:string,
@@ -1326,7 +1347,7 @@ export async function fetchNative(url:string, arg:{
let headers = arg.headers ?? {}
const db = get(DataBase)
let throughProxi = (!isTauri) && (!isNodeServer) && (!db.usePlainFetch) && (!Capacitor.isNativePlatform())
if(isTauri){
if(isTauri || Capacitor.isNativePlatform()){
fetchIndex++
if(arg.signal && arg.signal.aborted){
throw new Error('aborted')
@@ -1335,13 +1356,14 @@ export async function fetchNative(url:string, arg:{
fetchIndex = 0
}
let fetchId = fetchIndex.toString().padStart(5,'0')
tauriNativeFetchData[fetchId] = []
nativeFetchData[fetchId] = []
let resolved = false
let error = ''
while(!streamedFetchListening){
await sleep(100)
}
if(isTauri){
invoke('streamed_fetch', {
id: fetchId,
url: url,
@@ -1354,15 +1376,29 @@ export async function fetchNative(url:string, arg:{
resolved = true
}
})
}
else if(capStreamedFetch){
capStreamedFetch.streamedFetch({
id: fetchId,
url: url,
headers: headers,
body: Buffer.from(arg.body).toString('base64'),
}).then((res) => {
if(!res.success){
error = res.error
resolved = true
}
})
}
let resHeaders:{[key:string]:string} = null
let status = 400
const readableStream = new ReadableStream<Uint8Array>({
async start(controller) {
while(!resolved || tauriNativeFetchData[fetchId].length > 0){
if(tauriNativeFetchData[fetchId].length > 0){
const data = tauriNativeFetchData[fetchId].shift()
while(!resolved || nativeFetchData[fetchId].length > 0){
if(nativeFetchData[fetchId].length > 0){
const data = nativeFetchData[fetchId].shift()
console.log(data)
if(data.type === 'chunk'){
const chunk = Buffer.from(data.body, 'base64')