Theta V live streaming using Pi 3

livestreaming
raspberrypi

#1


I dont have Theta V on hand at this moment. I am success live streaming from pi 3 using Theta S. Are you tried use Theta V with pi 3? You said kernel not support uvc1.5, Is it possible connect theta V to pi keep uvc1.0 for live streaming low resolution? I want to know change camera in future and without change pi 3 and software.


#2

The THETA S works with the Raspberry Pi without any problems.

The THETA V will not work with the Raspberry Pi as far as we know. There is no way to use UVC 1.0 with the THETA V.

This thread suggests a way to get a Raspberry Pi to work with UVC 1.5. However, there has been no reported success.

People have reported success with the THETA V and the Android OS, which also uses the Linux kernel.

someone may have it working with a Raspberry Pi right now using the UVCCamera driver. However, we don’t have verification. If you get it working with a UVC 1.5 camera, please report back and ideally share the driver code. :slight_smile:


#3

It is possible to get a pseudo livestreaming with Theta V and Rpi on Raspbian. The quality is not optimal (it uses camera.getLivePreview to get a mjpeg streaming capacity on a python program, the Theta V is connected in Wifi, AP or client Mode). This solution serves many clients (the python program on the Rpi relays mjpeg stream) and display the video on a browser with three.js. I will post my code soon here, I just need to make it cleaner.

For now, I’m searching a way to have both recording video and streaming capacity (livepreview quality is enough).The Theta V is used on a small rover (a good RC car controled by the Rpi) in sewer pipes, get a look https://youtu.be/cFh2awHrH20

I’m french so my English is far from good…
Hugues


#4

Thank you for this contribution. Your English is great. This is an international community with people from all over the world.

Your project looks very interesting.

If you can’t easily figure out how to save the output of camera.getLivePreview to disk, you could also get a used THETA V or THETA S and attach a second camera onto the RC car for data storage. It’s not ideal, but it’s a quick way to give your client a video file to point out the problems with the pipe you’re inspecting…


#5

Oh, wow, that video is cool! This appears to be a really great use of 360 camera, since you can inspect the inside of the sewer pipe in all directions with only one camera.

What do you use this to look for normally? Cracks? Other problems with the pipes? You do not need greater resolution than camera.getLivePreview?

Wild seeing how you turn the RC car around (at about 14:00 in the video) and then seeing the sewer cover slide shut (at about 15:00 in the video)!

:crazy_face:


#6

@jcasman, I thought it was cool too. :slight_smile:

It would be nice to figure out how to save the MotionJPEG stream to a file. I think he’s using Python on a RaspberryPi. I’m not sure how to both save and transmit the stream. Maybe if he wrote a simple client socket program in Python and then have both the client socket and the clients he mentions access the same stream. He mentions that the current program can handle multiple clients. So, it should be able to connect to another client that save the stream to disk.

https://docs.python.org/3/howto/sockets.html

This would seem to be a good business in many countries. The ability to inspect the pipe thoroughly is incredible.


#7

I’m very interested in your work here, and I hope to use your code for a different application. Maybe we could speak directly about this project though I would hope to use a Raspberry Pi Zero W if possible. This video looks great to me! Thanks so much for the contribution, I will be eagerly following your progress!


#8

Here is my code.It is not cleaned yet and I maybe have made some mistakes but if you know Python, this should by OK. This should work on a Pi Zero W, the CPU usage is about 20% on a Pi 3B+.
The theta (lastest firmware) is connected in client mode, it use arp-scan to find the theta on the network (not sure it’s the best but it works) .In this mode you need to use a HTTPDigestAuth.
The code is devided in 2 files : One for connection ,parameters,file copy and one for the streaming process.

ThetaV1.py

import threading, sys, time, subprocess
import	 tty, termios, socket
import requests,json
from threading import Thread
from requests.auth import HTTPDigestAuth
from multiprocessing import Process
from multiprocessing import Value

sys.path.append('/home/pi/cyclope')
from ThetaStreamServerV1 import ThetaStreamServer
from ThetaStreamServerV1 import ThetaHandler
from RepertoireV1 import Repertoire
	
	
class Theta(Thread):

	HEADERS = {'content-type': 'application/json'}
	ALERTES = ["Caméra 360 non connectée"]
	ALERTES_COPIE = ["Traitement fichier 360 en cours"]
	AUTH = HTTPDigestAuth('THETAYL00108440', '00108440')
	ARP = "sudo arp-scan -I wlan0 10.0.1.10-10.0.1.50"
	RICOH = "RICOH COMPANY"
	FICHIER_CORRESPONDANCE = "/home/pi/cyclope/fichiersTheta.json"
	
	def __init__(self,tempsBoucle=3,tempsReconnexion=3):
		
		super(Theta, self).__init__()
		self.stopRequest = threading.Event()
		self.tempsBoucle = tempsBoucle
		self.tempsReconnexion = tempsReconnexion
		self.adresseIp = ""
		self.alertes = Theta.ALERTES
		self.niveauBatterie = 0.
		self.fichiersThetaJson = json.loads(' { "fichiers" : [] }')
		self.recordingIndicator = False
		self.nomFichier = ""
		self.list = []
		""" Modes de la theta :
			0 : adresse IP non trouvée
			1 : IP trouvee , serveur stream demare , mais pas de connexion active
			2 : au repos, live autorise
			3 : en enregistrement
			4 : en transfert de fichier
		"""
		self.mode = Value('i',0)
		self.lireFichiersTheta()
		
		self.methodeSend360 = "0"
		self.listefichiersSend360 = ""

	def getAlertes(self):
		return self.alertes
		
	def getMode(self):
		return self.mode.value

	def getAdresseIp(self):
		output = (subprocess.check_output(Theta.ARP, shell=True )).decode("utf-8").split("\n")
		for line in output:
			if (line.find(Theta.RICOH) != -1):
				self.adresseIp = line.split()[0]
				with self.mode.get_lock():
					self.mode.value = 1
				self.startLive()
					
					
	def _live(self):	
		print("_live")
		serveurLive =  ThetaStreamServer(('',9091),ThetaHandler)	
		serveurLive.initChargement(self.adresseIp,self.mode)
		serveurLive.serve_forever()


	def startLive(self):
		try:
			print("startLive")
			self.p = Process(target=self._live)
			self.p.start()
		except Exception as err :
			print("EXCEPTION startLive")
			print(err)
			pass

	def setOptions1(self):
		url = "".join(("http://", self.adresseIp, ":80/osc/commands/execute"))		
		body = json.dumps({"name": "camera.setOptions",	"parameters": {	"options": {
							"captureMode": "video",	"sleepDelay": 1200, "offDelay": 600, "videoStitching" : "none",
								"_microphoneChannel": "1ch", "_gain" : "mute",	"_shutterVolume" : 100,
								"previewFormat" : {"width": 1024, "height": 512, "framerate": 8}}}})
		try:
			req = requests.post(url, data=body,headers=Theta.HEADERS,auth=Theta.AUTH,timeout=3)
		except Exception:
			pass

	def setOptions2(self):
		url = "".join(("http://", self.adresseIp, ":80/osc/commands/execute"))
		
		body = json.dumps({"name": "camera.setOptions",	"parameters": {	"options": {							
								"fileFormat": {"type": "mp4", "width": 1920, "height": 960, "_codec": "H.264/MPEG-4 AVC"},
								"_maxRecordableTime" : 1500, "_bitrate" : "Normal",	"whiteBalance": "auto"	}}})
		try:
			req = requests.post(url, data=body,headers=Theta.HEADERS,auth=Theta.AUTH,timeout=3)
		except Exception:
			pass				
	

	def startRecording(self,nomFichier):		
		if (self.mode.value == 2):
			url = "".join(("http://", self.adresseIp, ":80/osc/commands/execute"))		
			body = json.dumps({	"name": "camera.startCapture"})
			try:
				req = requests.post(url, data=body,headers=Theta.HEADERS,auth=Theta.AUTH,timeout=3)
				self.recordingIndicator = True
				self.nomFichier = nomFichier
				with self.mode.get_lock():
					self.mode.value = 3
			except Exception:
				pass
			
			

	def stopRecording(self):
		if (self.mode.value == 3):
			url = "".join(("http://", self.adresseIp, ":80/osc/commands/execute"))		
			body = json.dumps({"name": "camera.stopCapture"})
			try:
				req = requests.post(url, data=body,headers=Theta.HEADERS,auth=Theta.AUTH,timeout=3)
				print(req.content.decode('UTF-8'))
				with self.mode.get_lock():
					self.mode.value = 2
			except Exception:
				pass		

			
	def getState(self):
		url = "".join(("http://", self.adresseIp, ":80/osc/state"))
		print ("THETA getState\n")
		try:
			req = requests.post(url,headers=Theta.HEADERS,auth=Theta.AUTH,timeout=10)
			if (req.status_code == 200):
				if (self.mode.value == 1):
					self.setOptions1()
					self.setOptions2()			
					with self.mode.get_lock():
						self.mode.value = 2
					
			
				donneeJson = json.loads(req.content.decode('UTF-8'))
				self.niveauBatterie = donneeJson["state"]["batteryLevel"]
				#TODO erreur internes + alrte temps enregistrment
				self.alertes = []
				print("req.status_code 200")
				#if (self.mode.value == 4):
				#	print("Theta.ALERTES_COPIE")
				#	self.alertes=Theta.ALERTES_COPIE
					
				capture = donneeJson["state"]["_captureStatus"]
				fileUrl = donneeJson["state"]["_latestFileUrl"]
				recordableTime= donneeJson["state"]["_recordableTime"]
				print("recordableTime")
				print(recordableTime)
				print("capture")
				print(capture)
				print(fileUrl)
				if ((self.recordingIndicator == True) and (capture == "idle")):	
					print(fileUrl)
					fileUrlSplit=fileUrl.split("/")

					self.ajoutFichiersTheta(fileUrlSplit[-1])
					self.nomFichier = ""
					self.recordingIndicator = False
				#if (capture == "shooting"):

		
		except Exception as err:
				print ("THETA getState Exception \n")
				print(err)
				with self.mode.get_lock():
					self.mode.value = 1
				self.alertes = Theta.ALERTES
				self.niveauBatterie = 0.
	
			
			

	
			
			
	def lireFichiersTheta(self):
		print("lireFichiersTheta")
		date = time.strftime("%Y-%m-%d",time.localtime(time.time()-60*60*24*30))
		try:
			with open(Theta.FICHIER_CORRESPONDANCE, "rt") as fichier:
				listeAvantTri = fichier.read()
				listeAvantTriJson = json.loads(listeAvantTri)
				for chunk in listeAvantTriJson["fichiers"]:
					if (chunk["date"] > date):
						self.fichiersThetaJson["fichiers"].append(chunk)			
		except Exception:
			print("Exception lireFichiersTheta")
			pass
			
			
	def ecrireFichiersTheta(self):
		print("ecrireFichiersTheta")
		try:
			with open(Theta.FICHIER_CORRESPONDANCE, "wt") as fichier:
				#fichier.write("[ ")
				#fichier.write(",".join(self.fichiersThetaJson))
				#fichier.write("]")		
				fichier.write( json.dumps(self.fichiersThetaJson))
		except Exception:
			print("Exception ecrireFichiersTheta")
			pass	
	
	
	def ajoutFichiersTheta(self,name):
		print("ajoutFichiersTheta")
		print(name)
		date = time.strftime("%Y-%m-%d",time.localtime())
		#ajout = json.dumps({ "name" : name , "nom" : self.nomFichier , "date" : date })
		ajout = { "name" : name , "nom" : self.nomFichier, "date" :  date  }
		if (self.nomFichier != ''):
			self.fichiersThetaJson["fichiers"].append(ajout)
			self.ecrireFichiersTheta()
			




	def join (self, timeout=2):
		self.p.join(timeout)
		self.stopRequest.set()
		super(Theta, self).join(timeout)			
			

	def listFiles(self,targetDir,espaceDisponible):
		print("listFiles")
		print(targetDir)
		print((espaceDisponible))
		listeFichiersJson = json.loads(' { "fichiers" : [], "espaceDisponible" : "", "message" : "" }')
		listeFichiers = ""
		
		message = ""
		limit = 0
		
		if (targetDir == Repertoire.CHEMIN_CARTE):
			message = "Clé USB non présente"
			limit = 1
		
		
		
		if (self.mode.value == 2):
			url = "".join(("http://", self.adresseIp, ":80/osc/commands/execute"))
			body = json.dumps({"name": "camera.listFiles",
						"parameters": {
								"fileType": "all",
								"entryCount": 128,
								"_detail" : False,
								"maxThumbSize": 0
						}
			})
			try:
				req = requests.post(url, data=body,headers=Theta.HEADERS,auth=Theta.AUTH,timeout=3)
				listeFichiers = req.content.decode('UTF-8')				
				entries = json.loads(listeFichiers)["results"]["entries"]
				self.list = entries
				for chunck in entries:
						name = chunck["name"]
						info = chunck["name"] + " (" + str(int(chunck["size"]/1000000)) + " Mo)"
						for chunck2 in self.fichiersThetaJson["fichiers"]:
							if (name == chunck2["name"]):
								info = chunck2["nom"] + " (" + str(int(chunck["size"]/1000000)) + " Mo)"
							
						ajout = { "name" : name , "info" : info, "size" : int(chunck["size"]/1000000) }
						listeFichiersJson["fichiers"].append(ajout)
			except Exception:
				pass		
					
		else:
			limit=2
			message = "Caméra 360 non disponible"
		
		print("listFiles2")
		listeFichiersJson["espaceDisponible"]=int(espaceDisponible)/1000 -1
		listeFichiersJson["message"]=message
		listeFichiersJson["limit"]=limit
		print("listFiles3")
		return json.dumps(listeFichiersJson)


	def sendFichiers360(self,methode,listefichiers):
		self.methodeSend360 = methode
		self.listefichiersSend360 = listefichiers


	def _sendFichiers360(self):
		print("sendFichiers360")
		methode= self.methodeSend360
		self.methodeSend360 = "0"
		print(methode)
		print(self.methodeSend360)
		self.alertes=Theta.ALERTES_COPIE
		for x in self.listefichiersSend360.split("_"):
			fileUrl = ""
			nomFichierDestination = x
			for y in self.list:
				if (y["name"] == x):
					fileUrl= y["fileUrl"]
			for z in self.fichiersThetaJson["fichiers"]:
				if (z["name"] == x):
					nomFichierDestination= z["nom"]
			
			if (fileUrl != ""):
				if (methode == "1"):				
					self.deleteFile(fileUrl)
				if (methode == "2"):
					print(nomFichierDestination)
					self.getFile(fileUrl,Repertoire.CHEMIN_CLE,nomFichierDestination)
				if (methode == "3"):	
					result = self.getFile(fileUrl,Repertoire.CHEMIN_CLE,nomFichierDestination)
					if (result == 0):
						self.deleteFile(fileUrl)



	def deleteFile(self,fileUrl):
		print ("deleteFile")
		print (fileUrl)
		fileUrls = []
		fileUrls.append(fileUrl)
		url = "".join(("http://", self.adresseIp, ":80/osc/commands/execute"))
		body = json.dumps({"name": "camera.delete",
						"parameters": {
								"fileUrls": 	fileUrls		}
			})
		try:
			req = requests.post(url, data=body,headers=Theta.HEADERS,auth=Theta.AUTH,timeout=5)	
			print (str(req.status_code))
		except Exception:
			pass
		
	def getFile(self,fileUrl,repertoireDestination,nomFichierDestination):
		print("getFile")
		print("fileUrl:" + fileUrl)
		print("repertoireDestination:" + repertoireDestination)
		print("nomFichierDestination:" + nomFichierDestination)
		exit_code = 0
		sequence = (repertoireDestination,"/",nomFichierDestination)
		fichier = "".join(sequence)
		
		print ("fichier:"  + fichier)
		
		if (self.mode.value == 2):
			with self.mode.get_lock():
					self.mode.value = 4
					
			#self.alertes = Theta.ALERTES_COPIE
			try:
				print ("getFile : " + fileUrl + " vers " + fichier)
				mon_fichier = open(fichier, "wb")
						
				response = requests.get(fileUrl,headers=Theta.HEADERS,auth=Theta.AUTH,stream=True,timeout=5)
				if response.status_code == 200:
					for block in response.iter_content(chunk_size=100000):
						if block:
							mon_fichier.write(block)
				
				mon_fichier.close()
				#self.alertes = []
			except Exception as err:
				print("Exception getFile")
				print (err)
				#self.alertes = ["Exception getFile"]
				exit_code = 1
				pass
			
			with self.mode.get_lock():
					self.mode.value = 2
			return exit_code
			
	
	
	def run(self) :		
		while not self.stopRequest.is_set():		
			if (self.mode.value == 0):
				self.getAdresseIp()	
		
			if (self.mode.value > 0):
				self.getState()
				if (self.methodeSend360 != "0"):
					self._sendFichiers360()

					
			if (self.mode.value == 0):
				time.sleep(self.tempsReconnexion)
			else:
				time.sleep(self.tempsBoucle)

ThetaStreamServerV1.py

import threading, sys, time, subprocess
import	 tty, termios, socket
import requests,json
from threading import Thread
from requests.auth import HTTPDigestAuth
from multiprocessing import Process
from multiprocessing import Value
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn

sys.path.append('/home/pi/cyclope')

class ThetaStreamServer(ThreadingMixIn, HTTPServer):

def initChargement(self,adresseIp,mode):
	self.nbClients = Value('i',0)
	self.thetaGetLivePreview = ThetaGetLivePreview(adresseIp,mode,self.nbClients)
	self.mode = mode
	self.thetaGetLivePreview.start()	
		
class ThetaHandler(BaseHTTPRequestHandler):

def do_GET(self):
	print("do_GET")
	with self.server.nbClients.get_lock():
		self.server.nbClients.value += 1
		print("nbClients :"		+ str(self.server.nbClients.value))
	try:	
		time.sleep(1)
		self.send_response(200)
		self.send_header('Access-Control-Allow-Origin', '*')
		self.send_header('Content-type', 'multipart/x-mixed-replace; boundary=--osclive')
		self.send_header("Pragma-directive", "no-cache")
		self.send_header("Cache-directive", "no-cache")
		#self.send_header("Cache-control", "no-cache")
		self.send_header("Cache-control", "no-cache, no-store, must-revalidate")
		self.send_header("Pragma", "no-cache")
		#self.send_header("Expires", "0")

		image=b''
		timeStampImage = ''
		
		#while (self.server.mode.value == 2 ):
		while True:
			if (timeStampImage != self.server.thetaGetLivePreview.timeStampImage):
				#print("NEW IMAGE SENT")
				timeStampImage = self.server.thetaGetLivePreview.timeStampImage
				self.end_headers()
				image= self.server.thetaGetLivePreview.image
				self.wfile.write('--osclive'.encode())
				self.end_headers()
				self.send_header('Content-type', 'image/jpeg')
				self.send_header('Content-length', len(image))
				self.end_headers()
				self.wfile.write(image)
			time.sleep(0.1)
		
		with self.server.nbClients.get_lock():
			self.server.nbClients.value = self.server.nbClients.value - 1
			
		print ("end DO GEtttttttttttttttttttttttttttt")
	except Exception as err:			
		print("Exception do_GET")
		print(err)
		with self.server.nbClients.get_lock():
			self.server.nbClients.value = self.server.nbClients.value - 1
		

		

class ThetaGetLivePreview(Thread):

def __init__(self, adresseIp,mode,nbClients):
	print("init ThetaGetLivePreview")
	super(ThetaGetLivePreview, self).__init__()
	self.stopRequest = threading.Event()
	self.adresseIp = adresseIp
	self.image=b''
	self.timeStampImage=time.time()
	self.isChargingImage = False
	self.mode = mode
	self.nbClients = nbClients

def _appel(self):
	print("_appel ThetaGetLivePreview")
	print(str(self.mode.value))
	
	url = "".join(("http://", self.adresseIp, ":80/osc/commands/execute"))
	body = json.dumps({"name": "camera.getLivePreview"})
	try:
		response = requests.post(url, data=body, headers={'content-type': 'application/json'},auth=HTTPDigestAuth('THETAYL00108440', '00108440'), stream=True,timeout=5)
		print("chargement _appel - response")
		if response.status_code == 200:
			self.isChargingImage = True
			bytes = ''
			jpg=''
			i = 0
			for block in response.iter_content(chunk_size=10000):
				if (self.nbClients.value == 0):
						print("ARRET _appel")
						response.close()
						raise StopIteration()
						
				if (bytes == ''):
					bytes = block
				else: 	
					bytes = bytes + block
				
				# Search the current block of bytes for the jpq start and end
				a = bytes.find(b'\xff\xd8')
				b = bytes.find(b'\xff\xd9')

				# If you have a jpg
				if a != - 1 and b != -1:
					print("image - chargement")
					self.image = bytes[a:b + 2]
					self.timeStampImage=time.time()
					bytes = bytes[b + 2:]					
					
		else:
			print("theta response.status_code _appel: {0}".format(response.status_code))
			response.close()
			self.isChargingImage = False	
	except Exception as err:
		print("theta erreur _appel: {0}".format(err))
		self.isChargingImage = False		

def run(self):
	while not self.stopRequest.is_set():
		if (self.mode.value == 2 and self.nbClients.value > 0):
			if (self.isChargingImage == False):
				self._appel()
		else:
			self.isChargingImage = False	
		time.sleep(0.5)

The Javascript part will come soon (the code quality is worth than the python ones). Any browser should view the video (but in equirectangular) .

Hugues


Industrial Robot Uses RICOH THETA 360° Live Video To Inspect Tight Places
#9

Thanks for posting your code, Hugues. Really appreciate it.


#10

@jcasman I was thinking of how best to connect the two discussion threads. There’s a description of his business and high-level technical overview at the article below.

I don’t think we should merge the topics as people are probably searching for “THETA V live streaming Raspberrry Pi” and not “industrial robot”. We have an interesting problem of how to help people find information on a technical topic versus a business use topic. Let me know if you have any ideas on how to help people find this type of good information.

I’m going to add a few tags to the topic. that might help.